Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty a9ef6d10d4 Issue #34: handle worktree roots in live smoke tests 2026-04-26 20:03:21 -04:00
Joseph Doherty 0f17a1d1d9 Add live MXAccess worker smoke test 2026-04-26 19:58:33 -04:00
dohertj2 160343aff4 Merge pull request #90 from agent-3/issue-43-scaffold-rust-workspace
Issue #43: scaffold Rust workspace
2026-04-26 19:52:33 -04:00
dohertj2 8ef98b8beb Merge pull request #89 from agent-1/issue-39-implement-dotnet-gatewayclient-and-session
Issue #39: implement .NET GatewayClient and session
2026-04-26 19:50:37 -04:00
Joseph Doherty f049d3e603 Merge remote-tracking branch 'origin/main' into agent-3/issue-43-scaffold-rust-workspace 2026-04-26 19:49:30 -04:00
Joseph Doherty ee88f9d647 Scaffold Rust client workspace 2026-04-26 19:47:26 -04:00
24 changed files with 2578 additions and 4 deletions
+1308
View File
File diff suppressed because it is too large Load Diff
+39
View File
@@ -0,0 +1,39 @@
[package]
name = "mxgateway-client"
version = "0.1.0"
edition = "2021"
publish = false
build = "build.rs"
[workspace]
members = ["crates/mxgw-cli"]
resolver = "2"
[workspace.package]
edition = "2021"
version = "0.1.0"
publish = false
[workspace.dependencies]
clap = { version = "4.5.53", features = ["derive"] }
prost = "0.13.5"
prost-types = "0.13.5"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["transport"] }
tonic-build = "0.13.1"
[dependencies]
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }
+53
View File
@@ -0,0 +1,53 @@
# Rust Client Workspace
The Rust client workspace contains the MXAccess Gateway client library, a
test CLI, and scaffold tests for generated contract wiring. The library uses
the shared protobuf inputs documented in
`../../docs/client-proto-generation.md` so the Rust bindings compile against
the same public gateway and worker contracts as the server.
## Layout
```text
clients/rust/
Cargo.toml
build.rs
src/
tests/
crates/mxgw-cli/
```
`build.rs` reads the `.proto` files from
`../../src/MxGateway.Contracts/Protos` and generates `tonic`/`prost` bindings
into Cargo build output. `src/generated.rs` declares the Rust modules that
include those generated files. `src/generated` remains reserved for checked-in
generator output if the crate later changes to source-tree generation.
## Build And Test
Run the Rust workspace checks from `clients/rust`:
```powershell
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
```
The build script uses `protoc` from `PATH` or the Windows path recorded in
`../../docs/toolchain-links.md`.
## CLI
The scaffold CLI exposes version information:
```powershell
cargo run -p mxgw-cli -- version --json
```
Additional commands are implemented with the client/session wrapper work.
## Related Documentation
- [Client Proto Generation](../../docs/client-proto-generation.md)
- [Rust Client Detailed Design](../../docs/clients-rust-design.md)
- [Rust Style Guide](../../docs/style-guides/RustStyleGuide.md)
+59
View File
@@ -0,0 +1,59 @@
use std::env;
use std::error::Error;
use std::path::{Path, PathBuf};
fn main() -> Result<(), Box<dyn Error>> {
configure_protoc();
let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR")?);
let repo_root = manifest_dir
.parent()
.and_then(Path::parent)
.ok_or("clients/rust must live two levels below the repository root")?;
let proto_root = repo_root.join("src/MxGateway.Contracts/Protos");
let gateway_proto = proto_root.join("mxaccess_gateway.proto");
let worker_proto = proto_root.join("mxaccess_worker.proto");
let descriptor_path = PathBuf::from(env::var("OUT_DIR")?).join("mxaccessgw-client-v1.protoset");
println!("cargo:rerun-if-changed={}", gateway_proto.display());
println!("cargo:rerun-if-changed={}", worker_proto.display());
tonic_build::configure()
.build_server(false)
.build_client(true)
.file_descriptor_set_path(descriptor_path)
.compile_protos(
&[gateway_proto.as_path(), worker_proto.as_path()],
&[proto_root.as_path()],
)?;
Ok(())
}
fn configure_protoc() {
if env::var_os("PROTOC").is_some() {
return;
}
for candidate in protoc_candidates() {
if candidate.is_file() {
env::set_var("PROTOC", candidate);
return;
}
}
}
fn protoc_candidates() -> Vec<PathBuf> {
let mut candidates = Vec::new();
if cfg!(windows) {
if let Some(local_app_data) = env::var_os("LOCALAPPDATA") {
candidates.push(PathBuf::from(local_app_data).join(
"Microsoft/WinGet/Packages/Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe/bin/protoc.exe",
));
}
}
candidates.push(PathBuf::from("protoc"));
candidates
}
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "mxgw-cli"
version.workspace = true
edition.workspace = true
publish.workspace = true
[[bin]]
name = "mxgw"
path = "src/main.rs"
[dependencies]
clap = { workspace = true }
mxgateway-client = { path = "../.." }
serde_json = { workspace = true }
+64
View File
@@ -0,0 +1,64 @@
use std::process::ExitCode;
use clap::{Parser, Subcommand};
use mxgateway_client::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use serde_json::json;
#[derive(Debug, Parser)]
#[command(name = "mxgw")]
#[command(about = "MXAccess Gateway Rust test CLI")]
struct Cli {
#[command(subcommand)]
command: Command,
}
#[derive(Debug, Subcommand)]
enum Command {
Version {
#[arg(long)]
json: bool,
},
}
fn main() -> ExitCode {
let cli = Cli::parse();
run(cli);
ExitCode::SUCCESS
}
fn run(cli: Cli) {
match cli.command {
Command::Version { json } => print_version(json),
}
}
fn print_version(use_json: bool) {
if use_json {
println!(
"{}",
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
);
return;
}
println!("mxgw {CLIENT_VERSION}");
println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}");
println!("worker protocol {WORKER_PROTOCOL_VERSION}");
}
#[cfg(test)]
mod tests {
use clap::Parser;
use super::Cli;
#[test]
fn parses_version_json_command() {
let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]);
assert!(parsed.is_ok());
}
}
+30
View File
@@ -0,0 +1,30 @@
use std::fmt;
/// API key wrapper that avoids exposing raw credentials in formatted output.
#[derive(Clone, Eq, PartialEq)]
pub struct ApiKey(String);
impl ApiKey {
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
pub fn expose_secret(&self) -> &str {
&self.0
}
}
impl fmt::Debug for ApiKey {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_tuple("ApiKey")
.field(&"<redacted>")
.finish()
}
}
impl fmt::Display for ApiKey {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("<redacted>")
}
}
+30
View File
@@ -0,0 +1,30 @@
use tonic::transport::Channel;
use crate::error::Error;
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::options::ClientOptions;
/// Thin owner for the generated gateway client.
pub struct GatewayClient {
inner: MxAccessGatewayClient<Channel>,
}
impl GatewayClient {
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
let channel = endpoint.connect().await?;
Ok(Self {
inner: MxAccessGatewayClient::new(channel),
})
}
pub fn into_inner(self) -> MxAccessGatewayClient<Channel> {
self.inner
}
}
+13
View File
@@ -0,0 +1,13 @@
use thiserror::Error as ThisError;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("invalid gateway endpoint `{endpoint}`: {detail}")]
InvalidEndpoint { endpoint: String, detail: String },
#[error("gateway transport error: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("gateway status error: {0}")]
Status(#[from] tonic::Status),
}
+15
View File
@@ -0,0 +1,15 @@
pub mod mxaccess_gateway {
pub mod v1 {
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("mxaccess_gateway.v1");
}
}
pub mod mxaccess_worker {
pub mod v1 {
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("mxaccess_worker.v1");
}
}
+21
View File
@@ -0,0 +1,21 @@
//! Rust client scaffold for MXAccess Gateway.
//!
//! The crate compiles generated `tonic` bindings from the shared gateway
//! protobuf contracts and exposes a small handwritten surface for future client
//! implementation work.
pub mod auth;
pub mod client;
pub mod error;
pub mod generated;
pub mod options;
pub mod session;
pub mod value;
pub mod version;
pub use auth::ApiKey;
pub use client::GatewayClient;
pub use error::Error;
pub use options::ClientOptions;
pub use session::Session;
pub use version::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
+54
View File
@@ -0,0 +1,54 @@
use std::fmt;
use crate::auth::ApiKey;
#[derive(Clone)]
pub struct ClientOptions {
endpoint: String,
api_key: Option<ApiKey>,
plaintext: bool,
}
impl ClientOptions {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
api_key: None,
plaintext: true,
}
}
pub fn with_api_key(mut self, api_key: ApiKey) -> Self {
self.api_key = Some(api_key);
self
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn api_key(&self) -> Option<&ApiKey> {
self.api_key.as_ref()
}
pub fn plaintext(&self) -> bool {
self.plaintext
}
}
impl Default for ClientOptions {
fn default() -> Self {
Self::new("http://127.0.0.1:5000")
}
}
impl fmt::Debug for ClientOptions {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("ClientOptions")
.field("endpoint", &self.endpoint)
.field("api_key", &self.api_key.as_ref().map(|_| "<redacted>"))
.field("plaintext", &self.plaintext)
.finish()
}
}
+15
View File
@@ -0,0 +1,15 @@
/// Session identifier returned by the gateway.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Session {
id: String,
}
impl Session {
pub fn new(id: impl Into<String>) -> Self {
Self { id: id.into() }
}
pub fn id(&self) -> &str {
&self.id
}
}
+9
View File
@@ -0,0 +1,9 @@
use crate::generated::mxaccess_gateway::v1::MxValue;
pub fn int32_value(value: i32) -> MxValue {
MxValue {
data_type: crate::generated::mxaccess_gateway::v1::MxDataType::Integer as i32,
kind: Some(crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value)),
..MxValue::default()
}
}
+3
View File
@@ -0,0 +1,3 @@
pub const CLIENT_VERSION: &str = "0.1.0-dev";
pub const GATEWAY_PROTOCOL_VERSION: u32 = 1;
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
+144
View File
@@ -0,0 +1,144 @@
use std::fs;
use std::path::PathBuf;
use mxgateway_client::generated::mxaccess_gateway::v1::{
mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent,
MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand,
};
use mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use serde_json::Value;
#[test]
fn generated_golden_fixtures_are_available() {
for fixture_name in [
"open-session-reply.ok.json",
"register-command-request.json",
"on-data-change-event.json",
] {
let fixture = read_fixture(fixture_name);
assert!(
fixture.is_object(),
"{fixture_name} must remain a protobuf JSON object"
);
}
}
#[test]
fn open_session_fixture_matches_protocol_versions() {
let fixture = read_fixture("open-session-reply.ok.json");
let reply = OpenSessionReply {
session_id: string_field(&fixture, "sessionId"),
backend_name: string_field(&fixture, "backendName"),
worker_process_id: i32_field(&fixture, "workerProcessId"),
worker_protocol_version: u32_field(&fixture, "workerProtocolVersion"),
gateway_protocol_version: u32_field(&fixture, "gatewayProtocolVersion"),
protocol_status: Some(
mxgateway_client::generated::mxaccess_gateway::v1::ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,
message: string_field(&fixture["protocolStatus"], "message"),
},
),
..OpenSessionReply::default()
};
assert_eq!(reply.gateway_protocol_version, GATEWAY_PROTOCOL_VERSION);
assert_eq!(reply.worker_protocol_version, WORKER_PROTOCOL_VERSION);
}
#[test]
fn register_fixture_can_build_generated_request() {
let fixture = read_fixture("register-command-request.json");
let command = &fixture["command"];
let request = MxCommandRequest {
session_id: string_field(&fixture, "sessionId"),
client_correlation_id: string_field(&fixture, "clientCorrelationId"),
command: Some(MxCommand {
kind: MxCommandKind::Register as i32,
payload: Some(mx_command::Payload::Register(RegisterCommand {
client_name: string_field(&command["register"], "clientName"),
})),
}),
};
assert_eq!(request.session_id, "session-fixture");
assert_eq!(
request.command.unwrap().kind,
MxCommandKind::Register as i32
);
}
#[test]
fn on_data_change_fixture_can_build_generated_event() {
let fixture = read_fixture("on-data-change-event.json");
let event = MxEvent {
family: MxEventFamily::OnDataChange as i32,
session_id: string_field(&fixture, "sessionId"),
server_handle: i32_field(&fixture, "serverHandle"),
item_handle: i32_field(&fixture, "itemHandle"),
value: Some(MxValue {
data_type: MxDataType::Integer as i32,
variant_type: string_field(&fixture["value"], "variantType"),
kind: Some(mx_value::Kind::Int32Value(i32_field(
&fixture["value"],
"int32Value",
))),
..MxValue::default()
}),
quality: i32_field(&fixture, "quality"),
worker_sequence: u64_field(&fixture, "workerSequence"),
..MxEvent::default()
};
assert_eq!(event.family, MxEventFamily::OnDataChange as i32);
assert_eq!(event.value.unwrap().data_type, MxDataType::Integer as i32);
}
fn read_fixture(name: &str) -> Value {
let path = fixture_root().join(name);
let data = fs::read_to_string(&path).unwrap_or_else(|error| {
panic!("failed to read fixture {}: {error}", path.display());
});
serde_json::from_str(&data).unwrap_or_else(|error| {
panic!("failed to parse fixture {}: {error}", path.display());
})
}
fn fixture_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../proto/fixtures/golden")
}
fn string_field(value: &Value, name: &str) -> String {
value[name]
.as_str()
.unwrap_or_else(|| panic!("missing string field {name}"))
.to_owned()
}
fn i32_field(value: &Value, name: &str) -> i32 {
value[name]
.as_i64()
.unwrap_or_else(|| panic!("missing i32 field {name}"))
.try_into()
.unwrap_or_else(|_| panic!("field {name} does not fit in i32"))
}
fn u32_field(value: &Value, name: &str) -> u32 {
value[name]
.as_u64()
.unwrap_or_else(|| panic!("missing u32 field {name}"))
.try_into()
.unwrap_or_else(|_| panic!("field {name} does not fit in u32"))
}
fn u64_field(value: &Value, name: &str) -> u64 {
if let Some(number) = value[name].as_u64() {
return number;
}
value[name]
.as_str()
.unwrap_or_else(|| panic!("missing u64 field {name}"))
.parse()
.unwrap_or_else(|_| panic!("field {name} does not parse as u64"))
}
+39
View File
@@ -35,6 +35,45 @@ inside the test.
`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange`
event, and `CloseSession` without loading MXAccess COM.
## Live MXAccess Smoke
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` composes the
real gRPC service, `SessionManager`, `SessionWorkerClientFactory`,
`WorkerClient`, `WorkerProcessLauncher`, and `MxGateway.Worker.exe`. It is
skipped unless `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1` is set because it creates
the installed MXAccess COM object and depends on live provider state.
The live smoke opens a gateway session, launches the x86 worker, runs
`Register`, `AddItem`, and `Advise`, waits a bounded time for one
`OnDataChange`, and closes the session in a `finally` block so the worker gets a
graceful shutdown request even when a command or event assertion fails.
Build the worker before running the smoke:
```bash
dotnet build src/MxGateway.Worker/MxGateway.Worker.csproj -p:Platform=x86
```
Run the smoke explicitly:
```bash
$env:MXGATEWAY_RUN_LIVE_MXACCESS_TESTS = "1"
dotnet test src/MxGateway.IntegrationTests/MxGateway.IntegrationTests.csproj --filter FullyQualifiedName~WorkerLiveMxAccessSmokeTests
```
Optional live smoke variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` | First existing `MxGateway.Worker.exe` under `src/MxGateway.Worker/bin/...` | Worker executable path. Set this when running against a packaged worker or a non-default build output. |
| `MXGATEWAY_LIVE_MXACCESS_ITEM` | `TestChildObject.TestInt` | MXAccess item reference used by `AddItem`. |
| `MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME` | `MxGateway.IntegrationTests` | Client name passed to `Register`. |
| `MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` | `15` | Maximum wait for the first `OnDataChange`. |
The test output includes session id, worker process id, command status,
HRESULT/status diagnostics, event sequence and handles, close status, and worker
stdout/stderr lines emitted during the run.
## Focused Commands
Run the fake worker tests after changing gateway worker IPC, session startup, or
+15 -4
View File
@@ -111,10 +111,21 @@ The script maps both proto files into the internal Go package
the source `.proto` files do not carry Go-specific `go_package` options. This
keeps language-specific packaging outside the public contract files.
Rust clients should use `tonic-build` or the selected protobuf generator from
the Rust client build script, with generated modules placed under
`clients/rust/src/generated` or included from the build output according to the
client crate design.
Rust clients use `tonic-build` from `clients/rust/build.rs`. The build script
reads the shared `.proto` files and emits generated `tonic`/`prost` modules
into Cargo build output. `clients/rust/src/generated.rs` contains the module
declarations that include those generated files. `clients/rust/src/generated`
remains reserved for checked-in generator output if the crate later changes to
source-tree generation, and handwritten wrapper code stays outside that
directory.
Run the Rust workspace checks from `clients/rust`:
```powershell
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
```
Python clients should use `grpc_tools.protoc` and write generated modules under
`clients/python/src/mxgateway/generated` so imports stay separate from
+8
View File
@@ -807,6 +807,14 @@ tests. `AddItem` uses `TestChildObject.TestInt` by default and accepts an
override through `MXGATEWAY_LIVE_MXACCESS_ITEM`; `AddItem2` uses the captured
parity fixture shape `AddItem2("TestInt", "TestChildObject")`.
`WorkerLiveMxAccessSmokeTests` in `src/MxGateway.IntegrationTests/` uses the
same opt-in variable for the gateway-to-worker live smoke. It launches the x86
worker through `WorkerProcessLauncher`, opens a gateway session, runs
`Register`, `AddItem`, and `Advise`, waits for one `OnDataChange`, and closes
the session. The smoke accepts `MXGATEWAY_LIVE_MXACCESS_WORKER_EXE` for a
non-default worker executable path and
`MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS` for the bounded event wait.
## Initial Implementation Slice
The first worker slice should implement:
@@ -3,10 +3,92 @@ namespace MxGateway.IntegrationTests;
public static class IntegrationTestEnvironment
{
public const string LiveMxAccessVariableName = "MXGATEWAY_RUN_LIVE_MXACCESS_TESTS";
public const string LiveMxAccessWorkerExecutableVariableName = "MXGATEWAY_LIVE_MXACCESS_WORKER_EXE";
public const string LiveMxAccessItemVariableName = "MXGATEWAY_LIVE_MXACCESS_ITEM";
public const string LiveMxAccessClientNameVariableName = "MXGATEWAY_LIVE_MXACCESS_CLIENT_NAME";
public const string LiveMxAccessEventTimeoutSecondsVariableName = "MXGATEWAY_LIVE_MXACCESS_EVENT_TIMEOUT_SECONDS";
public static bool LiveMxAccessTestsEnabled =>
string.Equals(
Environment.GetEnvironmentVariable(LiveMxAccessVariableName),
"1",
StringComparison.Ordinal);
public static string LiveMxAccessItem =>
GetOptionalEnvironmentVariable(
LiveMxAccessItemVariableName,
"TestChildObject.TestInt");
public static string LiveMxAccessClientName =>
GetOptionalEnvironmentVariable(
LiveMxAccessClientNameVariableName,
"MxGateway.IntegrationTests");
public static TimeSpan LiveMxAccessEventTimeout =>
TimeSpan.FromSeconds(GetPositiveIntegerEnvironmentVariable(
LiveMxAccessEventTimeoutSecondsVariableName,
defaultValue: 15));
public static string ResolveLiveMxAccessWorkerExecutablePath()
{
string? configuredPath = Environment.GetEnvironmentVariable(LiveMxAccessWorkerExecutableVariableName);
if (!string.IsNullOrWhiteSpace(configuredPath))
{
return Path.GetFullPath(configuredPath);
}
string repositoryRoot = ResolveRepositoryRoot(AppContext.BaseDirectory);
string[] candidatePaths =
[
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Debug", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Debug", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "Release", "net48", "MxGateway.Worker.exe"),
Path.Combine(repositoryRoot, "src", "MxGateway.Worker", "bin", "x86", "Release", "MxGateway.Worker.exe"),
];
return candidatePaths.FirstOrDefault(File.Exists)
?? candidatePaths[0];
}
private static string GetOptionalEnvironmentVariable(
string name,
string defaultValue)
{
string? value = Environment.GetEnvironmentVariable(name);
return string.IsNullOrWhiteSpace(value)
? defaultValue
: value;
}
private static int GetPositiveIntegerEnvironmentVariable(
string name,
int defaultValue)
{
string? value = Environment.GetEnvironmentVariable(name);
if (int.TryParse(value, out int parsed) && parsed > 0)
{
return parsed;
}
return defaultValue;
}
internal static string ResolveRepositoryRoot(string startDirectory)
{
DirectoryInfo? directory = new(startDirectory);
while (directory is not null)
{
if ((Directory.Exists(Path.Combine(directory.FullName, ".git"))
|| File.Exists(Path.Combine(directory.FullName, ".git")))
&& Directory.Exists(Path.Combine(directory.FullName, "src")))
{
return directory.FullName;
}
directory = directory.Parent;
}
return Directory.GetCurrentDirectory();
}
}
@@ -9,4 +9,37 @@ public sealed class IntegrationTestEnvironmentTests
"MXGATEWAY_RUN_LIVE_MXACCESS_TESTS",
IntegrationTestEnvironment.LiveMxAccessVariableName);
}
[Fact]
public void LiveMxAccessWorkerExecutable_UsesDocumentedEnvironmentVariable()
{
Assert.Equal(
"MXGATEWAY_LIVE_MXACCESS_WORKER_EXE",
IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName);
}
[Fact]
public void ResolveRepositoryRoot_AcceptsGitWorktreeFile()
{
string temporaryRoot = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
string nestedDirectory = Path.Combine(temporaryRoot, "tests", "bin");
try
{
Directory.CreateDirectory(nestedDirectory);
Directory.CreateDirectory(Path.Combine(temporaryRoot, "src"));
File.WriteAllText(Path.Combine(temporaryRoot, ".git"), "gitdir: ../.git/worktrees/test");
string repositoryRoot = IntegrationTestEnvironment.ResolveRepositoryRoot(nestedDirectory);
Assert.Equal(temporaryRoot, repositoryRoot);
}
finally
{
if (Directory.Exists(temporaryRoot))
{
Directory.Delete(temporaryRoot, recursive: true);
}
}
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.IntegrationTests;
public sealed class LiveMxAccessFactAttribute : FactAttribute
{
public LiveMxAccessFactAttribute()
{
if (!IntegrationTestEnvironment.LiveMxAccessTestsEnabled)
{
Skip = $"Set {IntegrationTestEnvironment.LiveMxAccessVariableName}=1 to run live MXAccess tests.";
}
}
}
@@ -18,6 +18,7 @@
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
<ProjectReference Include="..\MxGateway.Server\MxGateway.Server.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,517 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
using Xunit.Abstractions;
namespace MxGateway.IntegrationTests;
public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
{
private static readonly TimeSpan CommandTimeout = TimeSpan.FromSeconds(15);
private static readonly TimeSpan StreamShutdownTimeout = TimeSpan.FromSeconds(10);
[LiveMxAccessFact]
[Trait("Category", "LiveMxAccess")]
public async Task GatewaySession_WithLiveWorker_RegistersAdvisesStreamsDataAndCloses()
{
string workerExecutablePath = IntegrationTestEnvironment.ResolveLiveMxAccessWorkerExecutablePath();
Assert.True(
File.Exists(workerExecutablePath),
$"Live MXAccess worker executable was not found at {workerExecutablePath}. Build the worker or set {IntegrationTestEnvironment.LiveMxAccessWorkerExecutableVariableName}.");
TestWorkerProcessFactory processFactory = new(output);
await using GatewayServiceFixture fixture = new(workerExecutablePath, processFactory, output);
string? sessionId = null;
RecordingServerStreamWriter<MxEvent>? eventWriter = null;
Task? streamTask = null;
try
{
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "live-mxaccess-smoke",
ClientCorrelationId = "live-open",
CommandTimeout = Duration.FromTimeSpan(CommandTimeout),
},
new TestServerCallContext()).ConfigureAwait(false);
sessionId = openReply.SessionId;
output.WriteLine($"OpenSession status={openReply.ProtocolStatus.Code} session={sessionId} worker_pid={openReply.WorkerProcessId}");
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
Assert.True(openReply.WorkerProcessId > 0);
eventWriter = new RecordingServerStreamWriter<MxEvent>();
streamTask = fixture.Service.StreamEvents(
new StreamEventsRequest { SessionId = sessionId },
eventWriter,
new TestServerCallContext());
MxCommandReply registerReply = await fixture.Service.Invoke(
CreateRegisterRequest(sessionId),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Register", registerReply);
Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code);
Assert.True(registerReply.Register.ServerHandle > 0);
MxCommandReply addItemReply = await fixture.Service.Invoke(
CreateAddItemRequest(sessionId, registerReply.Register.ServerHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("AddItem", addItemReply);
Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code);
Assert.True(addItemReply.AddItem.ItemHandle > 0);
MxCommandReply adviseReply = await fixture.Service.Invoke(
CreateAdviseRequest(
sessionId,
registerReply.Register.ServerHandle,
addItemReply.AddItem.ItemHandle),
new TestServerCallContext()).ConfigureAwait(false);
LogReply("Advise", adviseReply);
Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code);
MxEvent dataChange = await eventWriter
.WaitForFirstMessageAsync(IntegrationTestEnvironment.LiveMxAccessEventTimeout)
.ConfigureAwait(false);
LogEvent(dataChange);
Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family);
Assert.Equal(sessionId, dataChange.SessionId);
Assert.Equal(registerReply.Register.ServerHandle, dataChange.ServerHandle);
Assert.Equal(addItemReply.AddItem.ItemHandle, dataChange.ItemHandle);
}
finally
{
try
{
if (!string.IsNullOrWhiteSpace(sessionId))
{
await CloseSessionAsync(fixture, sessionId).ConfigureAwait(false);
}
if (streamTask is not null)
{
await streamTask.WaitAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
finally
{
await processFactory.WaitForProcessesAsync(StreamShutdownTimeout).ConfigureAwait(false);
}
}
}
private static MxCommandRequest CreateRegisterRequest(string sessionId)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-register",
Command = new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand
{
ClientName = IntegrationTestEnvironment.LiveMxAccessClientName,
},
},
};
}
private static MxCommandRequest CreateAddItemRequest(
string sessionId,
int serverHandle)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-add-item",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = IntegrationTestEnvironment.LiveMxAccessItem,
},
},
};
}
private static MxCommandRequest CreateAdviseRequest(
string sessionId,
int serverHandle,
int itemHandle)
{
return new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-advise",
Command = new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
};
}
private async Task CloseSessionAsync(
GatewayServiceFixture fixture,
string sessionId)
{
CloseSessionReply closeReply = await fixture.Service.CloseSession(
new CloseSessionRequest
{
SessionId = sessionId,
ClientCorrelationId = "live-close",
},
new TestServerCallContext()).ConfigureAwait(false);
output.WriteLine($"CloseSession status={closeReply.ProtocolStatus.Code} final_state={closeReply.FinalState}");
}
private void LogReply(
string method,
MxCommandReply reply)
{
output.WriteLine(
$"{method} status={reply.ProtocolStatus.Code} hresult={reply.Hresult} diagnostic={reply.DiagnosticMessage}");
foreach (MxStatusProxy status in reply.Statuses)
{
output.WriteLine(
$"{method} mxstatus success={status.Success} category={status.Category} detail={status.Detail} text={status.DiagnosticText}");
}
}
private void LogEvent(MxEvent dataChange)
{
output.WriteLine(
$"Event family={dataChange.Family} worker_sequence={dataChange.WorkerSequence} server_handle={dataChange.ServerHandle} item_handle={dataChange.ItemHandle} quality={dataChange.Quality}");
output.WriteLine(
$"Event value_type={dataChange.Value?.DataType} raw_status={dataChange.RawStatus}");
}
private sealed class GatewayServiceFixture : IAsyncDisposable
{
private readonly GatewayMetrics _metrics = new();
private readonly SessionRegistry _registry = new();
private readonly ILoggerFactory _loggerFactory;
public GatewayServiceFixture(
string workerExecutablePath,
IWorkerProcessFactory processFactory,
ITestOutputHelper output)
{
IOptions<GatewayOptions> options = Options.Create(CreateOptions(workerExecutablePath));
_loggerFactory = LoggerFactory.Create(builder => builder.AddProvider(new TestOutputLoggerProvider(output)));
WorkerProcessLauncher launcher = new(
options,
processFactory,
new WorkerProcessStartedProbe(),
_metrics);
SessionWorkerClientFactory workerClientFactory = new(
launcher,
options,
_metrics,
_loggerFactory);
SessionManager sessionManager = new(
_registry,
workerClientFactory,
options,
_metrics,
logger: _loggerFactory.CreateLogger<SessionManager>());
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
mapper,
_metrics,
_loggerFactory.CreateLogger<EventStreamService>());
Service = new MxAccessGatewayService(
sessionManager,
new GatewayRequestIdentityAccessor(),
new MxAccessGrpcRequestValidator(),
mapper,
eventStreamService,
_loggerFactory.CreateLogger<MxAccessGatewayService>());
}
public MxAccessGatewayService Service { get; }
public async ValueTask DisposeAsync()
{
foreach (GatewaySession session in _registry.Snapshot())
{
await session.DisposeAsync().ConfigureAwait(false);
}
_loggerFactory.Dispose();
_metrics.Dispose();
}
private static GatewayOptions CreateOptions(string workerExecutablePath)
{
return new GatewayOptions
{
Worker = new WorkerOptions
{
ExecutablePath = workerExecutablePath,
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 15,
HeartbeatIntervalSeconds = 5,
HeartbeatGraceSeconds = 15,
MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
RequiredArchitecture = WorkerArchitecture.X86,
},
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 15,
MaxSessions = 1,
},
Events = new EventOptions
{
QueueCapacity = 32,
},
};
}
}
private sealed class RecordingServerStreamWriter<T> : IServerStreamWriter<T>
{
private readonly object syncRoot = new();
private readonly TaskCompletionSource<T> firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly List<T> messages = [];
public IReadOnlyList<T> Messages
{
get
{
lock (syncRoot)
{
return messages.ToArray();
}
}
}
public WriteOptions? WriteOptions { get; set; }
public Task WriteAsync(T message)
{
lock (syncRoot)
{
messages.Add(message);
}
firstMessage.TrySetResult(message);
return Task.CompletedTask;
}
public async Task<T> WaitForFirstMessageAsync(TimeSpan timeout)
{
return await firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false);
}
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
private sealed class TestWorkerProcessFactory(ITestOutputHelper output) : IWorkerProcessFactory
{
private readonly ConcurrentBag<TestWorkerProcess> processes = [];
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
startInfo.RedirectStandardError = true;
startInfo.RedirectStandardOutput = true;
startInfo.UseShellExecute = false;
Process process = new()
{
StartInfo = startInfo,
EnableRaisingEvents = true,
};
process.OutputDataReceived += (_, args) => WriteWorkerOutput("stdout", args.Data);
process.ErrorDataReceived += (_, args) => WriteWorkerOutput("stderr", args.Data);
if (!process.Start())
{
process.Dispose();
throw new InvalidOperationException("Worker process failed to start.");
}
process.BeginOutputReadLine();
process.BeginErrorReadLine();
TestWorkerProcess workerProcess = new(process);
processes.Add(workerProcess);
output.WriteLine($"WorkerProcess started pid={workerProcess.Id} path={startInfo.FileName}");
return workerProcess;
}
public async Task WaitForProcessesAsync(TimeSpan timeout)
{
foreach (TestWorkerProcess process in processes)
{
if (process.HasExited)
{
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
continue;
}
using CancellationTokenSource timeoutCancellation = new(timeout);
await process.WaitForExitAsync(timeoutCancellation.Token).ConfigureAwait(false);
output.WriteLine($"WorkerProcess exited pid={process.Id} exit_code={process.ExitCode}");
}
}
private void WriteWorkerOutput(
string streamName,
string? line)
{
if (!string.IsNullOrWhiteSpace(line))
{
output.WriteLine($"worker_{streamName}: {line}");
}
}
}
private sealed class TestWorkerProcess(Process process) : IWorkerProcess
{
public int Id => process.Id;
public bool HasExited => process.HasExited;
public int? ExitCode => process.HasExited ? process.ExitCode : null;
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
public void Kill(bool entireProcessTree)
{
process.Kill(entireProcessTree);
}
public void Dispose()
{
process.Dispose();
}
}
private sealed class TestOutputLoggerProvider(ITestOutputHelper output) : ILoggerProvider
{
public ILogger CreateLogger(string categoryName)
{
return new TestOutputLogger(output, categoryName);
}
public void Dispose()
{
}
}
private sealed class TestOutputLogger(
ITestOutputHelper output,
string categoryName) : ILogger
{
public IDisposable? BeginScope<TState>(TState state)
where TState : notnull
{
return null;
}
public bool IsEnabled(LogLevel logLevel)
{
return logLevel >= LogLevel.Information;
}
public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (!IsEnabled(logLevel))
{
return;
}
output.WriteLine($"{logLevel} {categoryName}: {formatter(state, exception)}");
if (exception is not null)
{
output.WriteLine(exception.ToString());
}
}
}
}