Compare commits

...

9 Commits

Author SHA1 Message Date
Joseph Doherty 1d93e77234 Merge remote-tracking branch 'origin/main' into agent-2/issue-35-parity-fixture-matrix 2026-04-26 20:49:43 -04:00
dohertj2 c032852065 Merge pull request #97 from agent-3/issue-46-implement-python-async-client-values-errors-and-cli
Issue #46: implement Python async client values errors and CLI
2026-04-26 20:50:10 -04:00
Joseph Doherty 0a670eb381 Issue #35: add parity fixture matrix 2026-04-26 20:47:05 -04:00
Joseph Doherty b57662aae7 Issue #46: implement Python async client values errors and CLI 2026-04-26 20:46:18 -04:00
dohertj2 14afb325c3 Merge pull request #96 from agent-1/issue-47-scaffold-java-gradle-build
Issue #47: scaffold Java Gradle build
2026-04-26 20:42:39 -04:00
Joseph Doherty af42891d5a Issue #47: scaffold Java Gradle build 2026-04-26 20:36:27 -04:00
dohertj2 01a51df053 Merge pull request #95 from agent-2/issue-44-implement-rust-client-session-values-errors-and-cli
Issue #44: implement Rust client session values errors and CLI
2026-04-26 20:34:28 -04:00
Joseph Doherty 89a8fb876a Issue #44: implement Rust client session values errors and CLI 2026-04-26 20:30:04 -04:00
dohertj2 c58358fad9 Merge pull request #94 from agent-3/issue-45-scaffold-python-package
Issue #45: scaffold Python package
2026-04-26 20:28:13 -04:00
46 changed files with 72771 additions and 57 deletions
+41
View File
@@ -0,0 +1,41 @@
# Java Client
The Java client workspace contains the Gradle scaffold for the MXAccess Gateway
client library, generated protobuf/gRPC bindings, a test CLI project, and JUnit
tests.
## Layout
```text
clients/java/
settings.gradle
build.gradle
src/main/generated/
mxgateway-client/
mxgateway-cli/
```
`mxgateway-client` generates Java protobuf and gRPC sources from
`../../src/MxGateway.Contracts/Protos`. The Gradle protobuf plugin writes those
generated sources under `src/main/generated`, which matches the client proto
manifest in `../proto/proto-inputs.json`. Do not edit generated files by hand.
`mxgateway-cli` depends on `mxgateway-client` and provides the `mxgw-java`
application entry point used by later CLI implementation work.
## Build And Test
Run the Java checks from `clients/java`:
```powershell
gradle test
```
The build uses the Java 21 Gradle toolchain, compiles generated protobuf/gRPC
code, and runs JUnit 5 tests for the scaffold and CLI entry point.
## Related Documentation
- [Client Proto Generation](../../docs/client-proto-generation.md)
- [Java Client Detailed Design](../../docs/clients-java-design.md)
- [Java Style Guide](../../docs/style-guides/JavaStyleGuide.md)
+38
View File
@@ -0,0 +1,38 @@
plugins {
id 'base'
}
ext {
grpcVersion = '1.76.0'
junitVersion = '5.14.1'
picocliVersion = '4.7.7'
protobufVersion = '4.33.1'
}
subprojects {
group = 'com.dohertylan.mxgateway'
version = '0.1.0'
pluginManager.withPlugin('java') {
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
tasks.withType(JavaCompile).configureEach {
options.encoding = 'UTF-8'
options.release = 21
}
tasks.withType(Test).configureEach {
useJUnitPlatform()
}
dependencies {
testImplementation platform("org.junit:junit-bom:${junitVersion}")
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
}
}
+12
View File
@@ -0,0 +1,12 @@
plugins {
id 'application'
}
dependencies {
implementation project(':mxgateway-client')
implementation "info.picocli:picocli:${picocliVersion}"
}
application {
mainClass = 'com.dohertylan.mxgateway.cli.MxGatewayCli'
}
@@ -0,0 +1,53 @@
package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
import java.io.PrintWriter;
import java.util.concurrent.Callable;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;
@Command(
name = "mxgw-java",
mixinStandardHelpOptions = true,
description = "MXAccess Gateway Java test CLI.",
subcommands = MxGatewayCli.VersionCommand.class)
public final class MxGatewayCli implements Callable<Integer> {
@Spec
private CommandSpec spec;
public static void main(String[] args) {
int exitCode = new CommandLine(new MxGatewayCli()).execute(args);
System.exit(exitCode);
}
public static int execute(PrintWriter out, PrintWriter err, String... args) {
CommandLine commandLine = new CommandLine(new MxGatewayCli());
commandLine.setOut(out);
commandLine.setErr(err);
return commandLine.execute(args);
}
@Override
public Integer call() {
spec.commandLine().usage(spec.commandLine().getOut());
return 0;
}
@Command(name = "version", description = "Prints the Java client scaffold version.")
public static final class VersionCommand implements Callable<Integer> {
@Spec
private CommandSpec spec;
@Override
public Integer call() {
spec.commandLine().getOut().printf(
"mxgateway-java %s gatewayProtocolVersion=%d workerProtocolVersion=%d%n",
MxGatewayClientVersion.clientVersion(),
MxGatewayClientVersion.gatewayProtocolVersion(),
MxGatewayClientVersion.workerProtocolVersion());
return 0;
}
}
}
@@ -0,0 +1,27 @@
package com.dohertylan.mxgateway.cli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@Test
void versionCommandPrintsProtocolVersions() {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
int exitCode = MxGatewayCli.execute(
new PrintWriter(output, true),
new PrintWriter(errors, true),
"version");
assertEquals(0, exitCode);
assertEquals("", errors.toString());
assertTrue(output.toString().contains("mxgateway-java 0.1.0"));
assertTrue(output.toString().contains("gatewayProtocolVersion=1"));
assertTrue(output.toString().contains("workerProtocolVersion=1"));
}
}
@@ -0,0 +1,46 @@
plugins {
id 'java-library'
id 'com.google.protobuf'
}
dependencies {
api "com.google.protobuf:protobuf-java:${protobufVersion}"
api "io.grpc:grpc-protobuf:${grpcVersion}"
api "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
compileOnly 'javax.annotation:javax.annotation-api:1.3.2'
}
sourceSets {
main {
proto {
srcDir rootProject.file('../../src/MxGateway.Contracts/Protos')
include 'mxaccess_gateway.proto'
include 'mxaccess_worker.proto'
}
}
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generatedFilesBaseDir = rootProject.file('src/main/generated').absolutePath
generateProtoTasks {
all().configureEach {
plugins {
grpc {}
}
}
}
}
@@ -0,0 +1,22 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewayClientVersion {
private static final int GATEWAY_PROTOCOL_VERSION = 1;
private static final int WORKER_PROTOCOL_VERSION = 1;
private static final String CLIENT_VERSION = "0.1.0";
private MxGatewayClientVersion() {
}
public static String clientVersion() {
return CLIENT_VERSION;
}
public static int gatewayProtocolVersion() {
return GATEWAY_PROTOCOL_VERSION;
}
public static int workerProtocolVersion() {
return WORKER_PROTOCOL_VERSION;
}
}
@@ -0,0 +1,29 @@
package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_worker.v1.MxaccessWorker.WorkerEnvelope;
import org.junit.jupiter.api.Test;
final class GeneratedContractSmokeTests {
@Test
void generatedGatewayAndWorkerContractsCompile() {
OpenSessionRequest request = OpenSessionRequest.newBuilder()
.setClientSessionName("junit")
.build();
WorkerEnvelope envelope = WorkerEnvelope.newBuilder()
.setProtocolVersion(MxGatewayClientVersion.workerProtocolVersion())
.build();
assertEquals("junit", request.getClientSessionName());
assertEquals("mxaccess_gateway.v1.MxAccessGateway", MxAccessGatewayGrpc.SERVICE_NAME);
assertEquals(1, envelope.getProtocolVersion());
}
@Test
void javaTwentyOneToolchainRunsTests() {
assertEquals(21, Runtime.version().feature());
}
}
+22
View File
@@ -0,0 +1,22 @@
pluginManagement {
repositories {
gradlePluginPortal()
mavenCentral()
}
plugins {
id 'com.google.protobuf' version '0.9.5'
}
}
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
mavenCentral()
}
}
rootProject.name = 'mxaccessgw-java'
include 'mxgateway-client'
include 'mxgateway-cli'
@@ -0,0 +1,588 @@
package mxaccess_gateway.v1;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
@io.grpc.stub.annotations.GrpcGenerated
public final class MxAccessGatewayGrpc {
private MxAccessGatewayGrpc() {}
public static final java.lang.String SERVICE_NAME = "mxaccess_gateway.v1.MxAccessGateway";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "OpenSession",
requestType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
MxAccessGatewayGrpc.getOpenSessionMethod = getOpenSessionMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "OpenSession"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("OpenSession"))
.build();
}
}
}
return getOpenSessionMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "CloseSession",
requestType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
MxAccessGatewayGrpc.getCloseSessionMethod = getCloseSessionMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "CloseSession"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("CloseSession"))
.build();
}
}
}
return getCloseSessionMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "Invoke",
requestType = mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
MxAccessGatewayGrpc.getInvokeMethod = getInvokeMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Invoke"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("Invoke"))
.build();
}
}
}
return getInvokeMethod;
}
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "StreamEvents",
requestType = mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.class,
responseType = mxaccess_gateway.v1.MxaccessGateway.MxEvent.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod() {
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
synchronized (MxAccessGatewayGrpc.class) {
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
MxAccessGatewayGrpc.getStreamEventsMethod = getStreamEventsMethod =
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "StreamEvents"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
mxaccess_gateway.v1.MxaccessGateway.MxEvent.getDefaultInstance()))
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("StreamEvents"))
.build();
}
}
}
return getStreamEventsMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static MxAccessGatewayStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub>() {
@java.lang.Override
public MxAccessGatewayStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayStub(channel, callOptions);
}
};
return MxAccessGatewayStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports all types of calls on the service
*/
public static MxAccessGatewayBlockingV2Stub newBlockingV2Stub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub>() {
@java.lang.Override
public MxAccessGatewayBlockingV2Stub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
}
};
return MxAccessGatewayBlockingV2Stub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static MxAccessGatewayBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub>() {
@java.lang.Override
public MxAccessGatewayBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingStub(channel, callOptions);
}
};
return MxAccessGatewayBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static MxAccessGatewayFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub>() {
@java.lang.Override
public MxAccessGatewayFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayFutureStub(channel, callOptions);
}
};
return MxAccessGatewayFutureStub.newStub(factory, channel);
}
/**
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public interface AsyncService {
/**
*/
default void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getOpenSessionMethod(), responseObserver);
}
/**
*/
default void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getCloseSessionMethod(), responseObserver);
}
/**
*/
default void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), responseObserver);
}
/**
*/
default void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getStreamEventsMethod(), responseObserver);
}
}
/**
* Base class for the server implementation of the service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static abstract class MxAccessGatewayImplBase
implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return MxAccessGatewayGrpc.bindService(this);
}
}
/**
* A stub to allow clients to do asynchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayStub
extends io.grpc.stub.AbstractAsyncStub<MxAccessGatewayStub> {
private MxAccessGatewayStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayStub(channel, callOptions);
}
/**
*/
public void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getInvokeMethod(), getCallOptions()), request, responseObserver);
}
/**
*/
public void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
getChannel().newCall(getStreamEventsMethod(), getCallOptions()), request, responseObserver);
}
}
/**
* A stub to allow clients to do synchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayBlockingV2Stub
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingV2Stub> {
private MxAccessGatewayBlockingV2Stub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayBlockingV2Stub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) throws io.grpc.StatusException {
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
getChannel(), getInvokeMethod(), getCallOptions(), request);
}
/**
*/
@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
public io.grpc.stub.BlockingClientCall<?, mxaccess_gateway.v1.MxaccessGateway.MxEvent>
streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall(
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
}
}
/**
* A stub to allow clients to do limited synchronous rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayBlockingStub
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingStub> {
private MxAccessGatewayBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayBlockingStub(channel, callOptions);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
}
/**
*/
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getInvokeMethod(), getCallOptions(), request);
}
/**
*/
public java.util.Iterator<mxaccess_gateway.v1.MxaccessGateway.MxEvent> streamEvents(
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
}
}
/**
* A stub to allow clients to do ListenableFuture-style rpc calls to service MxAccessGateway.
* <pre>
* Public client API for MXAccess sessions hosted by the gateway.
* </pre>
*/
public static final class MxAccessGatewayFutureStub
extends io.grpc.stub.AbstractFutureStub<MxAccessGatewayFutureStub> {
private MxAccessGatewayFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected MxAccessGatewayFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new MxAccessGatewayFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> openSession(
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> closeSession(
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> invoke(
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getInvokeMethod(), getCallOptions()), request);
}
}
private static final int METHODID_OPEN_SESSION = 0;
private static final int METHODID_CLOSE_SESSION = 1;
private static final int METHODID_INVOKE = 2;
private static final int METHODID_STREAM_EVENTS = 3;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final AsyncService serviceImpl;
private final int methodId;
MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_OPEN_SESSION:
serviceImpl.openSession((mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>) responseObserver);
break;
case METHODID_CLOSE_SESSION:
serviceImpl.closeSession((mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>) responseObserver);
break;
case METHODID_INVOKE:
serviceImpl.invoke((mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>) responseObserver);
break;
case METHODID_STREAM_EVENTS:
serviceImpl.streamEvents((mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest) request,
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getOpenSessionMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>(
service, METHODID_OPEN_SESSION)))
.addMethod(
getCloseSessionMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>(
service, METHODID_CLOSE_SESSION)))
.addMethod(
getInvokeMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>(
service, METHODID_INVOKE)))
.addMethod(
getStreamEventsMethod(),
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
new MethodHandlers<
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
mxaccess_gateway.v1.MxaccessGateway.MxEvent>(
service, METHODID_STREAM_EVENTS)))
.build();
}
private static abstract class MxAccessGatewayBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
MxAccessGatewayBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return mxaccess_gateway.v1.MxaccessGateway.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("MxAccessGateway");
}
}
private static final class MxAccessGatewayFileDescriptorSupplier
extends MxAccessGatewayBaseDescriptorSupplier {
MxAccessGatewayFileDescriptorSupplier() {}
}
private static final class MxAccessGatewayMethodDescriptorSupplier
extends MxAccessGatewayBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final java.lang.String methodName;
MxAccessGatewayMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (MxAccessGatewayGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new MxAccessGatewayFileDescriptorSupplier())
.addMethod(getOpenSessionMethod())
.addMethod(getCloseSessionMethod())
.addMethod(getInvokeMethod())
.addMethod(getStreamEventsMethod())
.build();
}
}
}
return result;
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,469 @@
{
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-parity-fixture-matrix",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"sourceCaptureRoot": "C:/Users/dohertj2/Desktop/mxaccess/captures",
"sourceDocs": [
"C:/Users/dohertj2/Desktop/mxaccess/docs/MXAccess-Public-API.md",
"C:/Users/dohertj2/Desktop/mxaccess/docs/Current-Sprint-State.md"
],
"comparisonFormat": {
"description": "Each parity run records the same command against direct MXAccess and the gateway-backed worker, then compares raw parity fields instead of client wrapper behavior.",
"directMxAccess": {
"requiredFields": [
"method",
"arguments",
"returnedValue",
"hresult",
"exceptionType",
"statuses",
"events"
]
},
"gatewayResult": {
"requiredFields": [
"kind",
"protocolStatus",
"returnValue",
"hresult",
"statuses",
"diagnosticMessage",
"events"
]
},
"eventFields": [
"family",
"serverHandle",
"itemHandle",
"value",
"quality",
"sourceTimestamp",
"statuses",
"workerSequence",
"workerTimestamp",
"gatewayReceiveTimestamp",
"hresult",
"rawStatus"
],
"comparisonKeys": [
"hresult",
"exceptionType",
"returnedValue",
"statusArrayShape",
"statusRawFields",
"eventFamilyOrder",
"eventPayloadShape",
"valueProjection",
"rawFallbackMetadata"
]
},
"methodFixtures": [
{
"id": "method.register.basic",
"method": "Register",
"commandKind": "MX_COMMAND_KIND_REGISTER",
"status": "planned_fixture",
"captureReferences": [
"captures/001-register/harness.log",
"captures/047-frida-com-proxy-register/harness.log"
],
"assertions": [
"preserve returned server handle in returnValue and RegisterReply",
"preserve success HRESULT as 0",
"do not emit MXAccess events for register"
]
},
{
"id": "method.unregister.basic",
"method": "Unregister",
"commandKind": "MX_COMMAND_KIND_UNREGISTER",
"status": "planned_fixture",
"captureReferences": [
"captures/001-register/harness.log",
"captures/109-native-post-remove-errors/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve HRESULT or COM exception details for invalid server handle",
"close registered handle only after MXAccess succeeds"
]
},
{
"id": "method.add-item.scalar",
"method": "AddItem",
"commandKind": "MX_COMMAND_KIND_ADD_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/002-add-remove-scalar/harness.log",
"captures/006-add-invalid/harness.log"
],
"assertions": [
"preserve returned item handle in returnValue and AddItemReply",
"preserve invalid item reference HRESULT/status details",
"do not prevalidate item definition in the gateway"
]
},
{
"id": "method.add-item2.context",
"method": "AddItem2",
"commandKind": "MX_COMMAND_KIND_ADD_ITEM2",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-additem2-testint-context.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
],
"assertions": [
"pass item_definition and item_context exactly as supplied",
"preserve returned item handle in returnValue and AddItem2Reply",
"compare context-bearing reference resolution against direct MXAccess"
]
},
{
"id": "method.remove-item.basic",
"method": "RemoveItem",
"commandKind": "MX_COMMAND_KIND_REMOVE_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/002-add-remove-scalar/harness.log",
"captures/109-native-post-remove-errors/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve post-remove and invalid-handle HRESULT/status behavior",
"remove diagnostic handle state only after MXAccess succeeds"
]
},
{
"id": "method.advise.supervisory-data-change",
"method": "Advise",
"commandKind": "MX_COMMAND_KIND_ADVISE",
"status": "planned_fixture",
"captureReferences": [
"captures/003-subscribe-scalars/harness.log",
"captures/058-frida-subscribe-testint/harness.log"
],
"assertions": [
"preserve successful command reply shape",
"forward OnDataChange with value, quality, timestamp, and status array",
"preserve per-worker event order"
]
},
{
"id": "method.unadvise.basic",
"method": "UnAdvise",
"commandKind": "MX_COMMAND_KIND_UN_ADVISE",
"status": "planned_fixture",
"captureReferences": [
"captures/058-frida-subscribe-testint/harness.log",
"captures/007-subscribe-invalid/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve invalid item handle HRESULT/status behavior",
"do not distinguish plain and supervisory cleanup beyond MXAccess behavior"
]
},
{
"id": "method.advise-supervisory.basic",
"method": "AdviseSupervisory",
"commandKind": "MX_COMMAND_KIND_ADVISE_SUPERVISORY",
"status": "planned_fixture",
"captureReferences": [
"captures/058-frida-subscribe-testint/harness.log",
"captures/105-frida-advise-shortdesc-prebound-fixed/harness.log"
],
"assertions": [
"keep AdviseSupervisory distinct from plain Advise in command kind",
"forward native OnDataChange only when MXAccess emits it",
"compare supervisory item status arrays without normalization"
]
},
{
"id": "method.add-buffered-item.context",
"method": "AddBufferedItem",
"commandKind": "MX_COMMAND_KIND_ADD_BUFFERED_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/079-frida-add-buffered-advise-testint/harness.log",
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
],
"assertions": [
"pass item_definition and item_context exactly as supplied",
"preserve returned buffered item handle in returnValue and AddBufferedItemReply",
"keep buffered registration distinct from normal AddItem2"
]
},
{
"id": "method.set-buffered-update-interval.basic",
"method": "SetBufferedUpdateInterval",
"commandKind": "MX_COMMAND_KIND_SET_BUFFERED_UPDATE_INTERVAL",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-set-buffered-interval-1000.log",
"captures/079-frida-add-buffered-advise-testint/harness.log"
],
"assertions": [
"preserve requested update interval without clamping in the gateway",
"preserve void return shape with explicit protocol success",
"compare buffered event cadence only in opt-in live runs"
]
},
{
"id": "method.suspend.scan-state",
"method": "Suspend",
"commandKind": "MX_COMMAND_KIND_SUSPEND",
"status": "planned_fixture",
"captureReferences": [
"captures/077-frida-suspend-advised-scanstate/harness.log",
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
],
"assertions": [
"preserve out MxStatus in SuspendReply and repeated statuses",
"preserve HRESULT separately from status detail",
"do not synthesize OperationComplete if native MXAccess does not raise it"
]
},
{
"id": "method.activate.scan-state",
"method": "Activate",
"commandKind": "MX_COMMAND_KIND_ACTIVATE",
"status": "planned_fixture",
"captureReferences": [
"captures/078-frida-activate-advised-scanstate/harness.log",
"captures/119-frida-activate-advised-scanstate-long/harness.log"
],
"assertions": [
"preserve out MxStatus in ActivateReply and repeated statuses",
"preserve HRESULT separately from status detail",
"do not synthesize OperationComplete if native MXAccess does not raise it"
]
},
{
"id": "method.write.value-status-matrix",
"method": "Write",
"commandKind": "MX_COMMAND_KIND_WRITE",
"status": "planned_fixture",
"captureReferences": [
"captures/023-frida-write-test-int-sequence-109-111/harness.log",
"captures/024-frida-write-test-bool-sequence/harness.log",
"captures/089-frida-write-testint-wrong-type/harness.log",
"captures/090-frida-write-invalid-reference/harness.log",
"captures/107-native-write-testint-current/harness.log"
],
"assertions": [
"preserve scalar and array value projections plus raw fallback metadata",
"preserve wrong-type and invalid-reference HRESULT/status arrays",
"forward OnWriteComplete only when native MXAccess emits it"
]
},
{
"id": "method.write2.timestamped",
"method": "Write2",
"commandKind": "MX_COMMAND_KIND_WRITE2",
"status": "planned_fixture",
"captureReferences": [
"captures/042-frida-write2-test-int-timestamp/harness.log",
"captures/066-frida-write2-test-bool-timestamp/harness.log",
"captures/075-frida-write2-test-datetime-array-timestamp/harness.log"
],
"assertions": [
"preserve timestamp_value as an MXAccess VARIANT projection",
"preserve write value shape and HRESULT/status arrays",
"compare timestamped write completion events against direct MXAccess"
]
},
{
"id": "method.write-secured.rejection-gap",
"method": "WriteSecured",
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED",
"status": "documented_gap",
"captureReferences": [
"captures/036-frida-write-secured-test-int/harness.log",
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
"captures/112-frida-write-secured-auth-verified-protectedvalue1/harness.log"
],
"assertions": [
"preserve observed 0x80004021 rejection before a value-bearing NMX body",
"preserve current_user_id and verifier_user_id only as command inputs, not logs",
"upgrade this gap to planned_fixture when a successful direct WriteSecured path is observed"
]
},
{
"id": "method.write-secured2.authenticated",
"method": "WriteSecured2",
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED2",
"status": "planned_fixture",
"captureReferences": [
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
"captures/116-frida-write-secured2-auth-verified-protectedvalue1/harness.log",
"captures/117-frida-write-secured2-auth-testint/harness.log"
],
"assertions": [
"preserve authenticated timestamped secured write body shape",
"preserve HRESULT/status arrays without logging credential-bearing values",
"do not synthesize OnWriteComplete when direct MXAccess does not emit it"
]
},
{
"id": "method.authenticate-user.basic",
"method": "AuthenticateUser",
"commandKind": "MX_COMMAND_KIND_AUTHENTICATE_USER",
"status": "planned_fixture",
"captureReferences": [
"captures/087-frida-authenticate-administrator-empty/harness.log",
"captures/088-frida-authenticate-invalid-empty/harness.log"
],
"assertions": [
"preserve returned user id in returnValue and AuthenticateUserReply",
"preserve invalid credential HRESULT/status behavior",
"redact verify_user_password from logs and diagnostics"
]
},
{
"id": "method.archestra-user-to-id.basic",
"method": "ArchestrAUserToId",
"commandKind": "MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-user-map-administrator.log",
"captures/mxaccess-user-map-invalid.log"
],
"assertions": [
"preserve returned user id in returnValue and ArchestrAUserToIdReply",
"preserve invalid user GUID HRESULT/status behavior",
"compare raw mapping behavior without normalizing unknown users"
]
}
],
"eventFixtures": [
{
"id": "event.on-data-change.scalar",
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"status": "planned_fixture",
"captureReferences": [
"captures/003-subscribe-scalars/harness.log",
"captures/106-native-subscribe-testint-current/harness.log"
],
"assertions": [
"preserve value, quality, timestamp, status array, and worker sequence"
]
},
{
"id": "event.on-write-complete.status",
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"status": "planned_fixture",
"captureReferences": [
"captures/008-write-test-int-same-value/harness.log",
"captures/107-native-write-testint-current/harness.log"
],
"assertions": [
"preserve write-complete status array and optional HRESULT"
]
},
{
"id": "event.operation-complete.native-trigger-gap",
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
"status": "documented_gap",
"captureReferences": [
"captures/077-frida-suspend-advised-scanstate/harness.log",
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
],
"assertions": [
"do not synthesize OperationComplete from Write or OnWriteComplete",
"upgrade this gap when a public MXAccess trigger emits event family 3"
]
},
{
"id": "event.on-buffered-data-change.batch-gap",
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
"status": "documented_gap",
"captureReferences": [
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
],
"assertions": [
"preserve raw buffered metadata until a public multi-sample event payload is observed",
"upgrade this gap when OnBufferedDataChange batches are captured from MXAccess"
]
}
],
"scenarioGroups": [
{
"id": "invalid_handles",
"description": "Invalid server, item, post-remove, and invalid-reference cases keep MXAccess-owned HRESULT and status behavior.",
"fixtureIds": [
"method.add-item.scalar",
"method.remove-item.basic",
"method.unadvise.basic",
"method.write.value-status-matrix",
"method.unregister.basic"
],
"captureReferences": [
"captures/006-add-invalid/harness.log",
"captures/007-subscribe-invalid/harness.log",
"captures/109-native-post-remove-errors/harness.log",
"captures/110-native-invalid-handle-errors/harness.log"
]
},
{
"id": "write_statuses",
"description": "Write success, wrong type, invalid reference, scalar arrays, and completion-status cases compare HRESULT, status array, value projection, and event shape.",
"fixtureIds": [
"method.write.value-status-matrix",
"method.write2.timestamped",
"event.on-write-complete.status"
],
"captureReferences": [
"captures/089-frida-write-testint-wrong-type/harness.log",
"captures/090-frida-write-invalid-reference/harness.log",
"captures/091-frida-write-testint-double-type/harness.log",
"captures/097-frida-write-bool-array-pattern/harness.log",
"captures/107-native-write-testint-current/harness.log"
]
},
{
"id": "secured_writes",
"description": "Secured writes include observed WriteSecured rejection and authenticated WriteSecured2 success paths without logging credential-bearing values.",
"fixtureIds": [
"method.write-secured.rejection-gap",
"method.write-secured2.authenticated",
"method.authenticate-user.basic"
],
"captureReferences": [
"captures/036-frida-write-secured-test-int/harness.log",
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
"captures/117-frida-write-secured2-auth-testint/harness.log"
]
},
{
"id": "add_item_context",
"description": "Context-bearing item registration compares AddItem2 and buffered AddBufferedItem argument preservation.",
"fixtureIds": [
"method.add-item2.context",
"method.add-buffered-item.context"
],
"captureReferences": [
"captures/mxaccess-additem2-testint-context.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
]
},
{
"id": "buffered_registration",
"description": "Buffered registration and interval setup are tracked separately from normal advice until a public buffered data-change batch is captured.",
"fixtureIds": [
"method.add-buffered-item.context",
"method.set-buffered-update-interval.basic",
"event.on-buffered-data-change.batch-gap"
],
"captureReferences": [
"captures/079-frida-add-buffered-advise-testint/harness.log",
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
]
}
]
}
+56 -6
View File
@@ -1,8 +1,8 @@
# Python Client
The Python client package contains generated MXAccess Gateway protobuf
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
scaffold. The package uses the shared proto inputs documented in
bindings, the async `mxgateway` package, and the `mxgw-py` test CLI. The
package uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
@@ -43,15 +43,65 @@ python -m pytest
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
```
The scaffold tests import the generated gateway and worker stubs and exercise
the deterministic CLI version output.
The tests import the generated gateway and worker stubs, run fake async gateway
stubs, verify API key metadata, exercise stream cancellation, load shared value
and command fixtures, and check deterministic CLI output.
## Library Usage
The library is async-first:
```python
from mxgateway import GatewayClient
async with await GatewayClient.connect(
endpoint="localhost:5000",
api_key="mxgw_example",
plaintext=True,
) as client:
session = await client.open_session(client_session_name="python-client")
try:
server_handle = await session.register("python-client")
item_handle = await session.add_item(server_handle, "Object.Attribute")
await session.advise(server_handle, item_handle)
finally:
await session.close()
```
`GatewayClient.open_session_raw`, `GatewayClient.invoke_raw`, and
`GatewayClient.stream_events_raw` keep the generated protobuf replies and
events available for parity tests. `Session` helpers call the method-specific
MXAccess commands and preserve raw replies on typed command exceptions.
Canceling a Python task cancels the client-side gRPC call or stream wait. It
does not abort an in-flight MXAccess COM call inside the worker process.
## Authentication And TLS
`ClientOptions.api_key` adds this metadata to unary calls and streams:
```text
authorization: Bearer <api-key>
```
The client supports plaintext channels for local development, TLS with system
roots, TLS with a custom `ca_file`, and an optional test server name override.
API keys are redacted from option repr output and CLI error output.
## CLI
The scaffold CLI exposes version information:
The CLI emits deterministic JSON for automation:
```powershell
mxgw-py version --json
mxgw-py open-session --endpoint localhost:5000 --plaintext --json
mxgw-py register --session-id <id> --client-name python-client --json
mxgw-py add-item --session-id <id> --server-handle 1 --item Object.Attribute --json
mxgw-py advise --session-id <id> --server-handle 1 --item-handle 2 --json
mxgw-py stream-events --session-id <id> --max-events 1 --json
mxgw-py write --session-id <id> --server-handle 1 --item-handle 2 --type int32 --value 123 --json
```
Additional commands are implemented with the async client/session wrapper work.
Use `--api-key` or `--api-key-env MXGATEWAY_API_KEY` to attach API key
metadata. `smoke` opens a session, registers, adds an item, advises, streams a
bounded event count, and closes the session in a `finally` block.
+34 -1
View File
@@ -1,5 +1,38 @@
"""MXAccess Gateway Python client package."""
from .auth import ApiKey, auth_metadata
from .client import GatewayClient
from .errors import (
MxAccessError,
MxGatewayAuthenticationError,
MxGatewayAuthorizationError,
MxGatewayCommandError,
MxGatewayError,
MxGatewaySessionError,
MxGatewayTransportError,
MxGatewayWorkerError,
)
from .options import ClientOptions
from .session import Session
from .values import MxValueView, from_mx_value, to_mx_value
from .version import __version__
__all__ = ["__version__"]
__all__ = [
"ApiKey",
"ClientOptions",
"GatewayClient",
"MxAccessError",
"MxGatewayAuthenticationError",
"MxGatewayAuthorizationError",
"MxGatewayCommandError",
"MxGatewayError",
"MxGatewaySessionError",
"MxGatewayTransportError",
"MxGatewayWorkerError",
"MxValueView",
"Session",
"__version__",
"auth_metadata",
"from_mx_value",
"to_mx_value",
]
+58
View File
@@ -0,0 +1,58 @@
"""Authentication metadata helpers for MXAccess Gateway clients."""
from collections.abc import Sequence
from dataclasses import dataclass
AUTHORIZATION_HEADER = "authorization"
REDACTED = "[redacted]"
@dataclass(frozen=True)
class ApiKey:
"""API key wrapper that avoids leaking the secret through repr output."""
value: str
def __post_init__(self) -> None:
if not self.value:
raise ValueError("api_key must not be empty")
def __repr__(self) -> str:
return f"{type(self).__name__}({REDACTED!r})"
def bearer_value(self) -> str:
return f"Bearer {self.value}"
def auth_metadata(api_key: str | ApiKey | None) -> tuple[tuple[str, str], ...]:
"""Return gRPC metadata for API key auth."""
if api_key is None:
return ()
key = api_key.value if isinstance(api_key, ApiKey) else api_key
if not key:
return ()
return ((AUTHORIZATION_HEADER, f"Bearer {key}"),)
def merge_metadata(
api_key: str | ApiKey | None,
metadata: Sequence[tuple[str, str]] | None = None,
) -> tuple[tuple[str, str], ...]:
"""Merge caller metadata with API key metadata."""
merged = list(metadata or ())
merged.extend(auth_metadata(api_key))
return tuple(merged)
def redact_secret(text: str, secrets: Sequence[str | None]) -> str:
"""Replace known secret values with a stable redaction marker."""
redacted = text
for secret in secrets:
if secret:
redacted = redacted.replace(secret, REDACTED)
return redacted
+165
View File
@@ -0,0 +1,165 @@
"""Async MXAccess Gateway client wrapper."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Sequence
from typing import Any
import grpc
from .auth import merge_metadata
from .errors import ensure_protocol_success, map_rpc_error
from .generated import mxaccess_gateway_pb2 as pb
from .generated import mxaccess_gateway_pb2_grpc as pb_grpc
from .options import ClientOptions, create_channel
class GatewayClient:
"""Async client for the public MXAccess Gateway gRPC API."""
def __init__(
self,
*,
options: ClientOptions,
stub: Any,
channel: grpc.aio.Channel | None = None,
) -> None:
self.options = options
self.raw_stub = stub
self._channel = channel
self._closed = False
@classmethod
async def connect(
cls,
options: ClientOptions | None = None,
*,
endpoint: str | None = None,
api_key: str | None = None,
plaintext: bool = False,
ca_file: str | None = None,
server_name_override: str | None = None,
stub: Any | None = None,
) -> "GatewayClient":
"""Create a client with either a real async channel or a supplied fake stub."""
resolved = options or ClientOptions(
endpoint=endpoint or "",
api_key=api_key,
plaintext=plaintext,
ca_file=ca_file,
server_name_override=server_name_override,
)
if stub is not None:
return cls(options=resolved, stub=stub)
channel = create_channel(resolved)
return cls(
options=resolved,
stub=pb_grpc.MxAccessGatewayStub(channel),
channel=channel,
)
async def __aenter__(self) -> "GatewayClient":
return self
async def __aexit__(self, *_exc_info: object) -> None:
await self.close()
async def close(self) -> None:
"""Close the owned gRPC channel."""
if self._closed:
return
self._closed = True
if self._channel is not None:
await self._channel.close()
async def open_session(
self,
request: pb.OpenSessionRequest | None = None,
*,
requested_backend: str = "",
client_session_name: str = "",
client_correlation_id: str = "",
) -> "Session":
"""Open a gateway session and return a high-level session wrapper."""
from .session import Session
raw_request = request or pb.OpenSessionRequest(
requested_backend=requested_backend,
client_session_name=client_session_name,
client_correlation_id=client_correlation_id,
)
reply = await self.open_session_raw(raw_request)
return Session(client=self, session_id=reply.session_id, open_reply=reply)
async def open_session_raw(self, request: pb.OpenSessionRequest) -> pb.OpenSessionReply:
reply = await self._unary("open session", self.raw_stub.OpenSession, request)
ensure_protocol_success("open session", reply.protocol_status, reply)
return reply
async def close_session_raw(
self,
request: pb.CloseSessionRequest,
) -> pb.CloseSessionReply:
reply = await self._unary("close session", self.raw_stub.CloseSession, request)
ensure_protocol_success("close session", reply.protocol_status, reply)
return reply
async def invoke_raw(self, request: pb.MxCommandRequest) -> pb.MxCommandReply:
reply = await self._unary("invoke", self.raw_stub.Invoke, request)
ensure_protocol_success("invoke", reply.protocol_status, reply)
return reply
def stream_events_raw(
self,
request: pb.StreamEventsRequest,
*,
metadata: Sequence[tuple[str, str]] | None = None,
) -> AsyncIterator[pb.MxEvent]:
"""Return an async event iterator and cancel the stream when iteration stops."""
call = self.raw_stub.StreamEvents(
request,
metadata=merge_metadata(self.options.api_key, metadata),
)
return _canceling_iterator(call)
async def _unary(
self,
operation: str,
method: Any,
request: Any,
*,
metadata: Sequence[tuple[str, str]] | None = None,
) -> Any:
call = method(
request,
metadata=merge_metadata(self.options.api_key, metadata),
)
try:
return await call
except asyncio.CancelledError:
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
raise
except grpc.RpcError as error:
raise map_rpc_error(operation, error) from error
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
try:
async for event in call:
yield event
except grpc.RpcError as error:
raise map_rpc_error("stream events", error) from error
finally:
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
+157
View File
@@ -0,0 +1,157 @@
"""Typed exception model for MXAccess Gateway Python clients."""
from __future__ import annotations
from typing import Any
import grpc
from .generated import mxaccess_gateway_pb2 as pb
class MxGatewayError(Exception):
"""Base class for client wrapper errors."""
def __init__(
self,
message: str,
*,
protocol_status: pb.ProtocolStatus | None = None,
raw_reply: Any | None = None,
) -> None:
super().__init__(message)
self.protocol_status = protocol_status
self.raw_reply = raw_reply
class MxGatewayTransportError(MxGatewayError):
"""Transport-level gRPC failure."""
class MxGatewayAuthenticationError(MxGatewayTransportError):
"""Authentication failure reported by gRPC."""
class MxGatewayAuthorizationError(MxGatewayTransportError):
"""Authorization failure reported by gRPC."""
class MxGatewaySessionError(MxGatewayError):
"""Gateway session failure."""
class MxGatewayWorkerError(MxGatewayError):
"""Gateway worker process or protocol failure."""
class MxGatewayCommandError(MxGatewayError):
"""Command failure that preserves the raw protobuf reply."""
class MxAccessError(MxGatewayCommandError):
"""MXAccess HRESULT or status failure."""
def map_rpc_error(operation: str, error: grpc.RpcError) -> MxGatewayTransportError:
"""Map a generated gRPC exception to the client exception hierarchy."""
code = error.code() if hasattr(error, "code") else None
details = error.details() if hasattr(error, "details") else str(error)
message = f"{operation} failed: {details}"
if code == grpc.StatusCode.UNAUTHENTICATED:
return MxGatewayAuthenticationError(message)
if code == grpc.StatusCode.PERMISSION_DENIED:
return MxGatewayAuthorizationError(message)
return MxGatewayTransportError(message)
def ensure_protocol_success(
operation: str,
protocol_status: pb.ProtocolStatus | None,
raw_reply: Any | None = None,
) -> Any | None:
"""Raise typed gateway errors for non-OK protocol statuses."""
code = (
protocol_status.code
if protocol_status is not None
else pb.PROTOCOL_STATUS_CODE_UNSPECIFIED
)
if code in (
pb.PROTOCOL_STATUS_CODE_OK,
pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
):
return raw_reply
message_text = protocol_status.message if protocol_status else ""
message = f"{operation} failed: {message_text or pb.ProtocolStatusCode.Name(code)}"
if code in (
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_READY,
):
raise MxGatewaySessionError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
if code in (
pb.PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE,
pb.PROTOCOL_STATUS_CODE_TIMEOUT,
pb.PROTOCOL_STATUS_CODE_CANCELED,
pb.PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION,
):
raise MxGatewayWorkerError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
raise MxGatewayCommandError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
def ensure_mxaccess_success(operation: str, reply: pb.MxCommandReply) -> pb.MxCommandReply:
"""Raise `MxAccessError` when MXAccess returned HRESULT or status failure."""
status = reply.protocol_status
if status.code == pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
if reply.HasField("hresult") and reply.hresult < 0:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
for mx_status in reply.statuses:
if mx_status.success == 0:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
return reply
def _mxaccess_message(operation: str, reply: pb.MxCommandReply) -> str:
status_text = reply.protocol_status.message or "MXAccess command failed"
hresult = reply.hresult if reply.HasField("hresult") else None
return (
f"{operation} failed: {status_text}; "
f"session={reply.session_id}; correlation={reply.correlation_id}; "
f"hresult={hresult}; statuses={len(reply.statuses)}"
)
+59
View File
@@ -0,0 +1,59 @@
"""Client connection options for the async Python wrapper."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import grpc
from .auth import REDACTED, ApiKey
@dataclass(frozen=True)
class ClientOptions:
"""Connection settings for `GatewayClient.connect`."""
endpoint: str
api_key: str | ApiKey | None = None
plaintext: bool = False
ca_file: str | None = None
server_name_override: str | None = None
def __post_init__(self) -> None:
if not self.endpoint:
raise ValueError("endpoint must not be empty")
if self.plaintext and self.ca_file:
raise ValueError("ca_file cannot be used with plaintext connections")
def __repr__(self) -> str:
api_key = REDACTED if self.api_key else None
return (
f"{type(self).__name__}(endpoint={self.endpoint!r}, "
f"api_key={api_key!r}, plaintext={self.plaintext!r}, "
f"ca_file={self.ca_file!r}, "
f"server_name_override={self.server_name_override!r})"
)
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
channel_options: list[tuple[str, str]] = []
if options.server_name_override:
channel_options.append(("grpc.ssl_target_name_override", options.server_name_override))
if options.plaintext:
return grpc.aio.insecure_channel(options.endpoint, options=channel_options)
root_certificates = None
if options.ca_file:
root_certificates = Path(options.ca_file).read_bytes()
credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
return grpc.aio.secure_channel(
options.endpoint,
credentials,
options=channel_options,
)
+209
View File
@@ -0,0 +1,209 @@
"""Async session wrapper for MXAccess Gateway commands."""
from __future__ import annotations
from collections.abc import AsyncIterator
from .errors import ensure_mxaccess_success
from .generated import mxaccess_gateway_pb2 as pb
from .values import MxValueInput, to_mx_value
class Session:
"""A single gateway-backed MXAccess session."""
def __init__(
self,
*,
client: "GatewayClient",
session_id: str,
open_reply: pb.OpenSessionReply | None = None,
) -> None:
self.client = client
self.session_id = session_id
self.open_reply = open_reply
self._closed = False
async def __aenter__(self) -> "Session":
return self
async def __aexit__(self, *_exc_info: object) -> None:
await self.close()
async def close(self, *, client_correlation_id: str = "") -> pb.CloseSessionReply:
"""Close the gateway session. Repeated calls return a local closed reply."""
if self._closed:
return pb.CloseSessionReply(
session_id=self.session_id,
final_state=pb.SESSION_STATE_CLOSED,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
)
self._closed = True
return await self.client.close_session_raw(
pb.CloseSessionRequest(
session_id=self.session_id,
client_correlation_id=client_correlation_id,
),
)
async def invoke(self, command: pb.MxCommand, *, correlation_id: str = "") -> pb.MxCommandReply:
"""Invoke a raw command and enforce gateway and MXAccess success."""
reply = await self.invoke_raw(command, correlation_id=correlation_id)
return ensure_mxaccess_success("invoke", reply)
async def invoke_raw(
self,
command: pb.MxCommand,
*,
correlation_id: str = "",
) -> pb.MxCommandReply:
"""Invoke a raw command and preserve the raw reply."""
return await self.client.invoke_raw(
pb.MxCommandRequest(
session_id=self.session_id,
client_correlation_id=correlation_id,
command=command,
),
)
async def register(self, client_name: str, *, correlation_id: str = "") -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_REGISTER,
register=pb.RegisterCommand(client_name=client_name),
),
correlation_id=correlation_id,
)
return reply.register.server_handle
async def unregister(self, server_handle: int, *, correlation_id: str = "") -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_UNREGISTER,
unregister=pb.UnregisterCommand(server_handle=server_handle),
),
correlation_id=correlation_id,
)
async def add_item(
self,
server_handle: int,
item_definition: str,
*,
correlation_id: str = "",
) -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
add_item=pb.AddItemCommand(
server_handle=server_handle,
item_definition=item_definition,
),
),
correlation_id=correlation_id,
)
return reply.add_item.item_handle
async def add_item2(
self,
server_handle: int,
item_definition: str,
item_context: str,
*,
correlation_id: str = "",
) -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADD_ITEM2,
add_item2=pb.AddItem2Command(
server_handle=server_handle,
item_definition=item_definition,
item_context=item_context,
),
),
correlation_id=correlation_id,
)
return reply.add_item2.item_handle
async def advise(
self,
server_handle: int,
item_handle: int,
*,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADVISE,
advise=pb.AdviseCommand(
server_handle=server_handle,
item_handle=item_handle,
),
),
correlation_id=correlation_id,
)
async def write(
self,
server_handle: int,
item_handle: int,
value: MxValueInput,
*,
user_id: int = 0,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE,
write=pb.WriteCommand(
server_handle=server_handle,
item_handle=item_handle,
value=to_mx_value(value),
user_id=user_id,
),
),
correlation_id=correlation_id,
)
async def write2(
self,
server_handle: int,
item_handle: int,
value: MxValueInput,
timestamp_value: MxValueInput,
*,
user_id: int = 0,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE2,
write2=pb.Write2Command(
server_handle=server_handle,
item_handle=item_handle,
value=to_mx_value(value),
timestamp_value=to_mx_value(timestamp_value),
user_id=user_id,
),
),
correlation_id=correlation_id,
)
def stream_events(
self,
*,
after_worker_sequence: int = 0,
) -> AsyncIterator[pb.MxEvent]:
return self.client.stream_events_raw(
pb.StreamEventsRequest(
session_id=self.session_id,
after_worker_sequence=after_worker_sequence,
),
)
from .client import GatewayClient # noqa: E402
+234
View File
@@ -0,0 +1,234 @@
"""MXAccess value conversion helpers."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from google.protobuf.timestamp_pb2 import Timestamp
from .generated import mxaccess_gateway_pb2 as pb
MxValueInput = bool | int | float | str | datetime | bytes | None | Sequence[Any]
@dataclass(frozen=True)
class MxValueView:
"""Typed projection of a raw `MxValue` protobuf message."""
value: Any
kind: str
raw: pb.MxValue
def to_mx_value(value: MxValueInput, *, data_type: str | None = None) -> pb.MxValue:
"""Convert a Python value into the public protobuf `MxValue` union."""
if isinstance(value, pb.MxValue):
return value
if value is None:
return pb.MxValue(
data_type=pb.MX_DATA_TYPE_NO_DATA,
variant_type="VT_EMPTY",
is_null=True,
raw_data_type=pb.MX_DATA_TYPE_NO_DATA,
)
if isinstance(value, bool):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_BOOLEAN),
variant_type="VT_BOOL",
bool_value=value,
)
if isinstance(value, int):
if -(2**31) <= value <= (2**31 - 1):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
variant_type="VT_I4",
int32_value=value,
)
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
variant_type="VT_I8",
int64_value=value,
)
if isinstance(value, float):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_DOUBLE),
variant_type="VT_R8",
double_value=value,
)
if isinstance(value, str):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_STRING),
variant_type="VT_BSTR",
string_value=value,
)
if isinstance(value, datetime):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_TIME),
variant_type="VT_DATE",
timestamp_value=_timestamp_from_datetime(value),
)
if isinstance(value, bytes):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
variant_type="VT_RECORD",
raw_value=value,
)
if isinstance(value, Sequence):
return _sequence_to_mx_value(value, data_type=data_type)
raise TypeError(f"unsupported MxValue input type: {type(value).__name__}")
def from_mx_value(value: pb.MxValue) -> MxValueView:
"""Project a protobuf `MxValue` into an idiomatic Python value."""
kind = value.WhichOneof("kind")
if kind is None:
return MxValueView(None, "none", value)
if kind == "timestamp_value":
return MxValueView(
value.timestamp_value.ToDatetime().replace(tzinfo=timezone.utc),
kind,
value,
)
if kind == "array_value":
return MxValueView(from_mx_array(value.array_value), kind, value)
return MxValueView(getattr(value, kind), kind, value)
def from_mx_array(array: pb.MxArray) -> list[Any]:
"""Project a protobuf `MxArray` into a Python list."""
kind = array.WhichOneof("values")
if kind is None:
return []
values = list(getattr(array, kind).values)
if kind == "timestamp_values":
return [
timestamp.ToDatetime().replace(tzinfo=timezone.utc)
for timestamp in values
]
return values
def _sequence_to_mx_value(
values: Sequence[Any],
*,
data_type: str | None,
) -> pb.MxValue:
sequence = list(values)
if not sequence:
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
array_value=pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
dimensions=[0],
),
)
first = sequence[0]
dimensions = [len(sequence)]
if all(isinstance(item, bool) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_BOOLEAN,
variant_type="VT_ARRAY|VT_BOOL",
dimensions=dimensions,
bool_values=pb.BoolArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_BOOLEAN, array_value=array)
if all(isinstance(item, int) and not isinstance(item, bool) for item in sequence):
use_int32 = all(-(2**31) <= item <= (2**31 - 1) for item in sequence)
if use_int32:
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_INTEGER,
variant_type="VT_ARRAY|VT_I4",
dimensions=dimensions,
int32_values=pb.Int32Array(values=sequence),
)
else:
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_INTEGER,
variant_type="VT_ARRAY|VT_I8",
dimensions=dimensions,
int64_values=pb.Int64Array(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_INTEGER, array_value=array)
if all(isinstance(item, float) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_DOUBLE,
variant_type="VT_ARRAY|VT_R8",
dimensions=dimensions,
double_values=pb.DoubleArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_DOUBLE, array_value=array)
if all(isinstance(item, str) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_STRING,
variant_type="VT_ARRAY|VT_BSTR",
dimensions=dimensions,
string_values=pb.StringArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_STRING, array_value=array)
if all(isinstance(item, datetime) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_TIME,
variant_type="VT_ARRAY|VT_DATE",
dimensions=dimensions,
timestamp_values=pb.TimestampArray(
values=[_timestamp_from_datetime(item) for item in sequence],
),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_TIME, array_value=array)
if all(isinstance(item, bytes) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
variant_type="VT_ARRAY|VT_VARIANT",
dimensions=dimensions,
raw_values=pb.RawArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_UNKNOWN, array_value=array)
raise TypeError(
"MxValue array inputs must use one supported element type; "
f"first element was {type(first).__name__}"
)
def _timestamp_from_datetime(value: datetime) -> Timestamp:
timestamp = Timestamp()
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
timestamp.FromDatetime(value.astimezone(timezone.utc))
return timestamp
def _data_type(name: str | None, default: int) -> int:
if name is None:
return default
return pb.MxDataType.Value(name)
+437 -2
View File
@@ -1,10 +1,24 @@
"""CLI scaffold for the MXAccess Gateway Python client."""
"""Command line interface for the MXAccess Gateway Python client."""
from __future__ import annotations
import asyncio
import json
import os
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import Any
import click
from google.protobuf.json_format import MessageToDict
from mxgateway import __version__
from mxgateway.auth import redact_secret
from mxgateway.client import GatewayClient
from mxgateway.errors import MxGatewayError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.options import ClientOptions
from mxgateway.values import MxValueInput
@click.group()
@@ -16,14 +30,435 @@ def main() -> None:
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def version(output_json: bool) -> None:
"""Print client package version information."""
payload = {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
_emit(payload, output_json=output_json, text=f"mxgw-py {__version__}")
def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
command = click.option("--endpoint", default="localhost:5000", show_default=True)(command)
command = click.option("--api-key", default=None, help="Gateway API key.")(command)
command = click.option(
"--api-key-env",
default=None,
help="Environment variable containing the gateway API key.",
)(command)
command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command)
command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command)
command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command)
command = click.option(
"--server-name-override",
default=None,
help="TLS server name override for test environments.",
)(command)
return command
@main.command("open-session")
@gateway_options
@click.option("--client-name", default="", help="Client session name.")
@click.option("--requested-backend", default="", help="Requested backend name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def open_session(**kwargs: Any) -> None:
"""Open a gateway session."""
_run(
_open_session(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("close-session")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def close_session(**kwargs: Any) -> None:
"""Close a gateway session."""
_run(
_close_session(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--message", default="ping", show_default=True, help="Ping payload.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def ping(**kwargs: Any) -> None:
"""Send a diagnostic ping command."""
_run(_ping(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--client-name", required=True, help="MXAccess client name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def register(**kwargs: Any) -> None:
"""Invoke MXAccess Register."""
_run(
_register(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("add-item")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item", required=True, help="MXAccess item definition.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def add_item(**kwargs: Any) -> None:
"""Invoke MXAccess AddItem."""
_run(
_add_item(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def advise(**kwargs: Any) -> None:
"""Invoke MXAccess Advise."""
_run(_advise(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("stream-events")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--after-worker-sequence", default=0, type=int, show_default=True)
@click.option("--max-events", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def stream_events(**kwargs: Any) -> None:
"""Stream a bounded number of events."""
_run(
_stream_events(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--value", required=True, help="Value to write.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write(**kwargs: Any) -> None:
"""Invoke MXAccess Write."""
_run(_write(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--value", required=True, help="Value to write.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp value.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write2(**kwargs: Any) -> None:
"""Invoke MXAccess Write2."""
_run(_write2(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--client-name", default="mxgw-py-smoke", show_default=True)
@click.option("--item", required=True, help="MXAccess item definition.")
@click.option("--max-events", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def smoke(**kwargs: Any) -> None:
"""Run a bounded open/register/add/advise/stream/close smoke flow."""
_run(_smoke(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
async def _open_session(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.open_session_raw(
pb.OpenSessionRequest(
requested_backend=kwargs["requested_backend"],
client_session_name=kwargs["client_name"],
client_correlation_id=kwargs["correlation_id"],
),
)
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
async def _close_session(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.close_session_raw(
pb.CloseSessionRequest(
session_id=kwargs["session_id"],
client_correlation_id=kwargs["correlation_id"],
),
)
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
async def _ping(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.invoke_raw(
pb.MxCommandRequest(
session_id=kwargs["session_id"],
command=pb.MxCommand(
kind=pb.MX_COMMAND_KIND_PING,
ping=pb.PingCommand(message=kwargs["message"]),
),
),
)
return {"kind": "ping", "rawReply": _message_dict(reply)}
async def _register(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
server_handle = await session.register(
kwargs["client_name"],
correlation_id=kwargs["correlation_id"],
)
return {"serverHandle": server_handle}
async def _add_item(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
item_handle = await session.add_item(
kwargs["server_handle"],
kwargs["item"],
correlation_id=kwargs["correlation_id"],
)
return {"itemHandle": item_handle}
async def _advise(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.advise(
kwargs["server_handle"],
kwargs["item_handle"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
events = await _collect_events(
session.stream_events(after_worker_sequence=kwargs["after_worker_sequence"]),
max_events=kwargs["max_events"],
timeout=kwargs["timeout"],
)
return {"events": [_message_dict(event) for event in events]}
async def _write(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.write(
kwargs["server_handle"],
kwargs["item_handle"],
value,
user_id=kwargs["user_id"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _write2(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
timestamp = _parse_datetime(kwargs["timestamp"])
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.write2(
kwargs["server_handle"],
kwargs["item_handle"],
value,
timestamp,
user_id=kwargs["user_id"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _smoke(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = await client.open_session(client_session_name=kwargs["client_name"])
closed = False
try:
server_handle = await session.register(kwargs["client_name"])
item_handle = await session.add_item(server_handle, kwargs["item"])
await session.advise(server_handle, item_handle)
events = await _collect_events(
session.stream_events(),
max_events=kwargs["max_events"],
timeout=kwargs["timeout"],
)
return {
"sessionId": session.session_id,
"serverHandle": server_handle,
"itemHandle": item_handle,
"events": [_message_dict(event) for event in events],
}
finally:
if not closed:
await session.close()
async def _connect(kwargs: dict[str, Any]) -> GatewayClient:
api_key = kwargs.get("api_key") or _api_key_from_env(kwargs.get("api_key_env"))
return await GatewayClient.connect(
ClientOptions(
endpoint=kwargs["endpoint"],
api_key=api_key,
plaintext=_use_plaintext(kwargs),
ca_file=kwargs.get("ca_file"),
server_name_override=kwargs.get("server_name_override"),
),
)
def _session(client: GatewayClient, session_id: str):
from mxgateway.session import Session
return Session(client=client, session_id=session_id)
def _use_plaintext(kwargs: dict[str, Any]) -> bool:
if kwargs.get("use_tls"):
return False
if kwargs.get("plaintext"):
return True
return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:")
def _api_key_from_env(name: str | None) -> str | None:
if not name:
return None
return os.environ.get(name)
def _secrets(kwargs: dict[str, Any]) -> list[str | None]:
return [
kwargs.get("api_key"),
_api_key_from_env(kwargs.get("api_key_env")),
]
def _run(
awaitable: Awaitable[dict[str, Any]],
*,
output_json: bool,
secrets: list[str | None],
) -> None:
try:
payload = asyncio.run(awaitable)
except MxGatewayError as error:
raise click.ClickException(redact_secret(str(error), secrets)) from error
_emit(payload, output_json=output_json)
def _emit(
payload: dict[str, Any],
*,
output_json: bool,
text: str | None = None,
) -> None:
if output_json:
click.echo(json.dumps(payload, sort_keys=True))
return
click.echo(f"mxgw-py {__version__}")
click.echo(text or json.dumps(payload, sort_keys=True))
async def _collect_events(
events: Any,
*,
max_events: int,
timeout: float,
) -> list[pb.MxEvent]:
collected: list[pb.MxEvent] = []
iterator = events.__aiter__()
try:
while len(collected) < max_events:
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
except StopAsyncIteration:
pass
finally:
close = getattr(iterator, "aclose", None)
if close is not None:
await close()
return collected
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
normalized = value_type.lower()
if normalized == "bool":
return raw_value.lower() in ("1", "true", "yes", "on")
if normalized in ("int", "int32", "int64"):
return int(raw_value)
if normalized in ("float", "double"):
return float(raw_value)
if normalized in ("time", "timestamp"):
return _parse_datetime(raw_value)
if normalized == "raw":
return raw_value.encode("utf-8")
if normalized == "string":
return raw_value
raise click.BadParameter(f"unsupported value type: {value_type}", param_hint="--type")
def _parse_datetime(raw_value: str) -> datetime:
if raw_value.endswith("Z"):
raw_value = raw_value[:-1] + "+00:00"
parsed = datetime.fromisoformat(raw_value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def _message_dict(message: Any) -> dict[str, Any]:
return MessageToDict(
message,
preserving_proto_field_name=False,
use_integers_for_enums=False,
)
+103
View File
@@ -0,0 +1,103 @@
"""Tests for auth metadata and connection options."""
import pytest
from mxgateway.auth import REDACTED, ApiKey, auth_metadata, redact_secret
from mxgateway import options as options_module
from mxgateway.options import ClientOptions, create_channel
def test_auth_metadata_adds_bearer_api_key() -> None:
assert auth_metadata("mxgw_test_secret") == (
("authorization", "Bearer mxgw_test_secret"),
)
def test_api_key_repr_is_redacted() -> None:
api_key = ApiKey("mxgw_test_secret")
assert "mxgw_test_secret" not in repr(api_key)
assert REDACTED in repr(api_key)
def test_redact_secret_replaces_known_values() -> None:
redacted = redact_secret(
"authorization failed for mxgw_test_secret",
["mxgw_test_secret"],
)
assert redacted == f"authorization failed for {REDACTED}"
def test_client_options_reject_plaintext_with_ca_file() -> None:
with pytest.raises(ValueError, match="ca_file"):
ClientOptions(
endpoint="localhost:5000",
plaintext=True,
ca_file="ca.pem",
)
def test_client_options_repr_redacts_api_key() -> None:
options = ClientOptions(endpoint="localhost:5000", api_key="mxgw_test_secret")
assert "mxgw_test_secret" not in repr(options)
assert REDACTED in repr(options)
def test_create_channel_uses_plaintext_channel(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object]] = []
def fake_insecure_channel(endpoint: str, *, options: object) -> str:
calls.append((endpoint, options))
return "plain-channel"
monkeypatch.setattr(
options_module.grpc.aio,
"insecure_channel",
fake_insecure_channel,
)
channel = create_channel(ClientOptions(endpoint="localhost:5000", plaintext=True))
assert channel == "plain-channel"
assert calls == [("localhost:5000", [])]
def test_create_channel_uses_tls_channel(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object, object]] = []
def fake_credentials(*, root_certificates: object) -> str:
assert root_certificates is None
return "creds"
def fake_secure_channel(endpoint: str, credentials: object, *, options: object) -> str:
calls.append((endpoint, credentials, options))
return "tls-channel"
monkeypatch.setattr(
options_module.grpc,
"ssl_channel_credentials",
fake_credentials,
)
monkeypatch.setattr(
options_module.grpc.aio,
"secure_channel",
fake_secure_channel,
)
channel = create_channel(
ClientOptions(
endpoint="gateway.example:5001",
server_name_override="gateway.test",
),
)
assert channel == "tls-channel"
assert calls == [
(
"gateway.example:5001",
"creds",
[("grpc.ssl_target_name_override", "gateway.test")],
),
]
+48 -1
View File
@@ -1,4 +1,4 @@
"""Tests for the Python CLI scaffold."""
"""Tests for the Python CLI."""
import json
@@ -19,3 +19,50 @@ def test_version_json_is_deterministic() -> None:
"package": "mxaccess-gateway-client",
"version": __version__,
}
def test_write_parser_rejects_unknown_value_type() -> None:
runner = CliRunner()
result = runner.invoke(
main,
[
"write",
"--session-id",
"session-1",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"unsupported",
"--value",
"123",
"--api-key",
"mxgw_test_secret",
"--json",
],
)
assert result.exit_code != 0
assert "unsupported value type" in result.output
def test_cli_error_output_redacts_api_key() -> None:
runner = CliRunner()
result = runner.invoke(
main,
[
"open-session",
"--endpoint",
"127.0.0.1:1",
"--api-key",
"mxgw_test_secret",
"--plaintext",
"--json",
],
)
assert result.exit_code != 0
assert "mxgw_test_secret" not in result.output
+225
View File
@@ -0,0 +1,225 @@
"""Tests for the async client and session wrappers."""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from mxgateway import ClientOptions, GatewayClient, MxAccessError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
@pytest.mark.asyncio
async def test_session_helpers_send_auth_metadata_and_preserve_raw_replies() -> None:
stub = FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session(client_session_name="pytest")
server_handle = await session.register("pytest-client")
item_handle = await session.add_item(server_handle, "Object.Attribute")
await session.advise(server_handle, item_handle)
assert session.session_id == "session-1"
assert server_handle == 12
assert item_handle == 34
assert stub.open_session.metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert stub.invoke.requests[0].command.register.client_name == "pytest-client"
assert stub.invoke.requests[1].command.add_item.item_definition == "Object.Attribute"
assert stub.invoke.requests[2].command.advise.item_handle == 34
@pytest.mark.asyncio
async def test_mxaccess_error_preserves_raw_reply() -> None:
stub = FakeGatewayStub()
failure_reply = pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE,
protocol_status=pb.ProtocolStatus(
code=pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
message="MXAccess rejected write.",
),
hresult=-1,
)
stub.invoke.replies = [failure_reply]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
with pytest.raises(MxAccessError) as captured:
await session.write(12, 34, 123)
assert captured.value.raw_reply is failure_reply
@pytest.mark.asyncio
async def test_stream_events_cancels_underlying_call_when_closed() -> None:
stream = FakeStream(
[
pb.MxEvent(
session_id="session-1",
worker_sequence=1,
family=pb.MX_EVENT_FAMILY_ON_DATA_CHANGE,
),
],
)
stub = FakeGatewayStub(stream=stream)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
events = session.stream_events()
first = await anext(events)
await events.aclose()
assert first.worker_sequence == 1
assert stream.cancelled
assert stub.stream_metadata == (("authorization", "Bearer mxgw_test_secret"),)
@pytest.mark.asyncio
async def test_unary_task_cancellation_reaches_fake_call() -> None:
blocking = BlockingCancellableUnary()
stub = FakeGatewayStub()
stub.OpenSession = blocking
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
task = asyncio.create_task(client.open_session())
await blocking.started.wait()
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert blocking.call is not None
assert blocking.call.cancelled
class FakeGatewayStub:
def __init__(self, stream: "FakeStream | None" = None) -> None:
self.open_session = FakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.close_session = FakeUnary(
[
pb.CloseSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = FakeUnary(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_REGISTER,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
register=pb.RegisterReply(server_handle=12),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
add_item=pb.AddItemReply(item_handle=34),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADVISE,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.OpenSession = self.open_session
self.CloseSession = self.close_session
self.Invoke = self.invoke
self._stream = stream or FakeStream([])
self.stream_metadata: tuple[tuple[str, str], ...] | None = None
def StreamEvents(
self,
request: pb.StreamEventsRequest,
*,
metadata: tuple[tuple[str, str], ...],
) -> "FakeStream":
self.stream_request = request
self.stream_metadata = metadata
return self._stream
class FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class BlockingCancellableUnary:
def __init__(self) -> None:
self.started = asyncio.Event()
self.call: BlockingCall | None = None
def __call__(self, *_args: Any, **_kwargs: Any) -> "BlockingCall":
self.call = BlockingCall(self.started)
return self.call
class BlockingCall:
def __init__(self, started: asyncio.Event) -> None:
self.started = started
self.cancelled = False
def __await__(self):
return self._wait().__await__()
async def _wait(self) -> Any:
self.started.set()
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
raise
def cancel(self) -> None:
self.cancelled = True
class FakeStream:
def __init__(self, events: list[pb.MxEvent]) -> None:
self._events = events
self.cancelled = False
def __aiter__(self) -> "FakeStream":
return self
async def __anext__(self) -> pb.MxEvent:
if not self._events:
await asyncio.sleep(3600)
return self._events.pop(0)
def cancel(self) -> None:
self.cancelled = True
+49
View File
@@ -0,0 +1,49 @@
"""Tests for typed command error mapping."""
import json
from pathlib import Path
import pytest
from google.protobuf.json_format import ParseDict
from mxgateway.errors import ensure_mxaccess_success, ensure_protocol_success
from mxgateway import MxAccessError, MxGatewaySessionError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
def test_register_fixture_is_protocol_and_mxaccess_success() -> None:
reply = _load_reply("command-replies/register.ok.reply.json")
assert ensure_protocol_success("register", reply.protocol_status, reply) is reply
assert ensure_mxaccess_success("register", reply) is reply
def test_write_failure_fixture_preserves_raw_reply() -> None:
reply = _load_reply("command-replies/write.mxaccess-failure.reply.json")
assert ensure_protocol_success("write", reply.protocol_status, reply) is reply
with pytest.raises(MxAccessError) as captured:
ensure_mxaccess_success("write", reply)
assert captured.value.raw_reply is reply
assert captured.value.raw_reply.hresult == -2147220992
assert len(captured.value.raw_reply.statuses) == 2
def test_session_status_maps_to_session_error() -> None:
status = pb.ProtocolStatus(
code=pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
message="session missing",
)
with pytest.raises(MxGatewaySessionError) as captured:
ensure_protocol_success("invoke", status)
assert captured.value.protocol_status is status
def _load_reply(name: str) -> pb.MxCommandReply:
payload = json.loads((FIXTURE_ROOT / name).read_text(encoding="utf-8"))
return ParseDict(payload, pb.MxCommandReply())
+49
View File
@@ -0,0 +1,49 @@
"""Tests for MXAccess value conversion helpers."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from google.protobuf.json_format import ParseDict
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.values import from_mx_value, to_mx_value
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
def test_value_conversion_fixtures_project_expected_oneof_kind() -> None:
payload = json.loads(
(FIXTURE_ROOT / "values" / "value-conversion-cases.json").read_text(
encoding="utf-8",
),
)
for case in payload["cases"]:
value = ParseDict(case["value"], pb.MxValue())
projection = from_mx_value(value)
assert projection.kind == _snake_case(case["expectedKind"])
assert projection.raw is value
def test_to_mx_value_supports_scalar_and_array_inputs() -> None:
assert to_mx_value(True).WhichOneof("kind") == "bool_value"
assert to_mx_value(12).int32_value == 12
assert to_mx_value(2**40).int64_value == 2**40
assert to_mx_value(12.5).double_value == 12.5
assert to_mx_value("abc").string_value == "abc"
assert to_mx_value([1, 2]).array_value.int32_values.values == [1, 2]
assert to_mx_value(["a", "b"]).array_value.string_values.values == ["a", "b"]
def test_to_mx_value_uses_utc_timestamps() -> None:
value = to_mx_value(datetime(2026, 1, 1, 0, 0, 4, tzinfo=timezone.utc))
assert value.data_type == pb.MX_DATA_TYPE_TIME
assert value.timestamp_value.seconds == 1767225604
def _snake_case(value: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", value).lower()
+131 -1
View File
@@ -145,6 +145,16 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cc"
version = "1.2.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d"
dependencies = [
"find-msvc-tools",
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.4"
@@ -225,6 +235,12 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "fixedbitset"
version = "0.5.7"
@@ -258,6 +274,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@@ -277,11 +304,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "getrandom"
version = "0.4.2"
@@ -537,11 +576,14 @@ checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
name = "mxgateway-client"
version = "0.1.0"
dependencies = [
"futures-core",
"futures-util",
"prost",
"prost-types",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
]
@@ -551,8 +593,11 @@ name = "mxgw-cli"
version = "0.1.0"
dependencies = [
"clap",
"futures-util",
"mxgateway-client",
"serde",
"serde_json",
"tokio",
]
[[package]]
@@ -724,6 +769,20 @@ version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.17",
"libc",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.1.4"
@@ -737,6 +796,41 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "rustls"
version = "0.23.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pki-types"
version = "1.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9"
dependencies = [
"zeroize",
]
[[package]]
name = "rustls-webpki"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "semver"
version = "1.0.28"
@@ -750,6 +844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
@@ -785,6 +880,12 @@ dependencies = [
"zmij",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "slab"
version = "0.4.12"
@@ -823,6 +924,12 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.117"
@@ -847,7 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -899,6 +1006,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
dependencies = [
"rustls",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
@@ -945,6 +1062,7 @@ dependencies = [
"prost",
"socket2 0.5.10",
"tokio",
"tokio-rustls",
"tokio-stream",
"tower",
"tower-layer",
@@ -1046,6 +1164,12 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf8parse"
version = "0.2.2"
@@ -1301,6 +1425,12 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "zeroize"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
[[package]]
name = "zmij"
version = "1.0.21"
+9 -2
View File
@@ -16,24 +16,31 @@ publish = false
[workspace.dependencies]
clap = { version = "4.5.53", features = ["derive"] }
futures-core = "0.3.31"
futures-util = "0.3.31"
prost = "0.13.5"
prost-types = "0.13.5"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["transport"] }
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "sync", "time"] }
tokio-stream = { version = "0.1.17", features = ["net"] }
tonic = { version = "0.13.1", features = ["transport", "tls-ring"] }
tonic-build = "0.13.1"
[dependencies]
futures-core = { workspace = true }
futures-util = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }
+40 -3
View File
@@ -1,7 +1,8 @@
# Rust Client Workspace
The Rust client workspace contains the MXAccess Gateway client library, a
test CLI, and scaffold tests for generated contract wiring. The library uses
test CLI, and tests for generated contract wiring plus wrapper behavior. The
library uses
the shared protobuf inputs documented in
`../../docs/client-proto-generation.md` so the Rust bindings compile against
the same public gateway and worker contracts as the server.
@@ -31,6 +32,7 @@ Run the Rust workspace checks from `clients/rust`:
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
cargo clippy --workspace --all-targets -- -D warnings
```
The build script uses `protoc` from `PATH` or the Windows path recorded in
@@ -38,13 +40,48 @@ The build script uses `protoc` from `PATH` or the Windows path recorded in
## CLI
The scaffold CLI exposes version information:
The CLI exposes version, session, command, event stream, write, and smoke
commands over the same client wrapper used by tests:
```powershell
cargo run -p mxgw-cli -- version --json
cargo run -p mxgw-cli -- open-session --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
cargo run -p mxgw-cli -- register --session-id <session-id> --client-name mxgw-rust-cli --json
cargo run -p mxgw-cli -- add-item --session-id <session-id> --server-handle 1 --item TestChildObject.TestInt --json
cargo run -p mxgw-cli -- advise --session-id <session-id> --server-handle 1 --item-handle 1 --json
cargo run -p mxgw-cli -- stream-events --session-id <session-id> --max-events 1 --json
cargo run -p mxgw-cli -- write --session-id <session-id> --server-handle 1 --item-handle 1 --value-type int32 --value 123 --json
```
Additional commands are implemented with the client/session wrapper work.
Use `--tls`, `--ca-file`, and `--server-name-override` for TLS endpoints. The
CLI reads the API key from `--api-key` or from `--api-key-env`, which defaults
to `MXGATEWAY_API_KEY`. API keys are redacted by the library option and secret
types.
## Library Surface
`ClientOptions` configures endpoint, API key, plaintext or TLS transport,
timeouts, custom CA files, and server name override. `GatewayClient::connect`
creates an authenticated `tonic` client and attaches `authorization: Bearer
<api-key>` metadata to unary and streaming calls.
`GatewayClient` exposes raw generated calls through `open_session_raw`,
`close_session_raw`, `invoke_raw`, `stream_events`, and `raw_client`. The
session helpers keep MXAccess handles visible:
```rust
let session = client.open_session(request).await?;
let server_handle = session.register("mxgw-rust").await?;
let item_handle = session.add_item(server_handle, "TestChildObject.TestInt").await?;
session.advise(server_handle, item_handle).await?;
let mut events = session.events().await?;
session.close().await?;
```
`MxValue`, `MxArrayValue`, and `MxStatus` wrap generated protobuf messages while
preserving the raw message for parity diagnostics. Command replies whose
protocol status is not `PROTOCOL_STATUS_CODE_OK` become `Error::Command` and
retain the raw `MxCommandReply`.
## Related Documentation
+1 -1
View File
@@ -19,7 +19,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("cargo:rerun-if-changed={}", worker_proto.display());
tonic_build::configure()
.build_server(false)
.build_server(true)
.build_client(true)
.file_descriptor_set_path(descriptor_path)
.compile_protos(
+3
View File
@@ -10,5 +10,8 @@ path = "src/main.rs"
[dependencies]
clap = { workspace = true }
futures-util = { workspace = true }
mxgateway-client = { path = "../.." }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
+518 -14
View File
@@ -1,8 +1,20 @@
use std::env;
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
use clap::{Parser, Subcommand};
use mxgateway_client::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest,
PingCommand, StreamEventsRequest,
};
use mxgateway_client::{
ApiKey, ClientOptions, Error, GatewayClient, MxValue, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION,
WORKER_PROTOCOL_VERSION,
};
use serde_json::json;
use serde_json::Value;
#[derive(Debug, Parser)]
#[command(name = "mxgw")]
@@ -18,30 +30,428 @@ enum Command {
#[arg(long)]
json: bool,
},
Ping {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "ping")]
message: String,
#[arg(long)]
json: bool,
},
OpenSession {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "mxgw-rust-cli")]
client_name: String,
#[arg(long)]
json: bool,
},
CloseSession {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
json: bool,
},
Register {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long, default_value = "mxgw-rust-cli")]
client_name: String,
#[arg(long)]
json: bool,
},
AddItem {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item: String,
#[arg(long)]
json: bool,
},
Advise {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long)]
json: bool,
},
StreamEvents {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long, default_value_t = 0)]
after_worker_sequence: u64,
#[arg(long, default_value_t = 1)]
max_events: usize,
#[arg(long)]
json: bool,
},
Write {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long)]
value: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
Write2 {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long)]
item_handle: i32,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long)]
value: String,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
Smoke {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
item: String,
#[arg(long, default_value = "mxgw-rust-smoke")]
client_name: String,
#[arg(long)]
json: bool,
},
}
fn main() -> ExitCode {
#[derive(Debug, Args, Clone)]
struct ConnectionArgs {
#[arg(long, default_value = "http://127.0.0.1:5000")]
endpoint: String,
#[arg(long)]
api_key: Option<String>,
#[arg(long, default_value = "MXGATEWAY_API_KEY")]
api_key_env: String,
#[arg(long)]
plaintext: bool,
#[arg(long)]
tls: bool,
#[arg(long)]
ca_file: Option<PathBuf>,
#[arg(long)]
server_name_override: Option<String>,
#[arg(long, default_value_t = 10)]
connect_timeout_seconds: u64,
#[arg(long, default_value_t = 30)]
call_timeout_seconds: u64,
}
impl ConnectionArgs {
fn options(&self) -> ClientOptions {
let mut options = ClientOptions::new(self.endpoint.clone())
.with_plaintext(!self.tls || self.plaintext)
.with_connect_timeout(Duration::from_secs(self.connect_timeout_seconds))
.with_call_timeout(Duration::from_secs(self.call_timeout_seconds));
if let Some(api_key) = self
.api_key
.clone()
.or_else(|| env::var(&self.api_key_env).ok())
.filter(|value| !value.is_empty())
{
options = options.with_api_key(ApiKey::new(api_key));
}
if let Some(ca_file) = &self.ca_file {
options = options.with_ca_file(ca_file);
}
if let Some(server_name_override) = &self.server_name_override {
options = options.with_server_name_override(server_name_override);
}
options
}
}
#[derive(Clone, Copy, Debug, ValueEnum)]
enum CliValueType {
Bool,
Int32,
Int64,
Float,
Double,
String,
}
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
run(cli);
ExitCode::SUCCESS
match run(cli).await {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
eprintln!("{error}");
ExitCode::FAILURE
}
}
}
fn run(cli: Cli) {
async fn run(cli: Cli) -> Result<(), Error> {
match cli.command {
Command::Version { json } => print_version(json),
Command::Ping {
connection,
message,
json,
} => {
let client = connect(connection).await?;
let reply = client
.invoke(MxCommandRequest {
client_correlation_id: "rust-cli-ping".to_owned(),
command: Some(MxCommand {
kind: MxCommandKind::Ping as i32,
payload: Some(mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping(
PingCommand { message },
)),
}),
..MxCommandRequest::default()
})
.await?;
print_command_reply("ping", &reply, json);
}
Command::OpenSession {
connection,
client_name,
json,
} => {
let client = connect(connection).await?;
let reply = client
.open_session_raw(OpenSessionRequest {
client_session_name: client_name,
..OpenSessionRequest::default()
})
.await?;
if json {
println!(
"{}",
json!({
"sessionId": reply.session_id,
"backendName": reply.backend_name,
"gatewayProtocolVersion": reply.gateway_protocol_version,
"workerProtocolVersion": reply.worker_protocol_version,
})
);
} else {
println!("{}", reply.session_id);
}
}
Command::CloseSession {
connection,
session_id,
json,
} => {
let client = connect(connection).await?;
let reply = client
.close_session_raw(CloseSessionRequest {
session_id,
client_correlation_id: "rust-cli-close-session".to_owned(),
})
.await?;
if json {
println!("{}", json!({ "sessionId": reply.session_id }));
} else {
println!("closed {}", reply.session_id);
}
}
Command::Register {
connection,
session_id,
client_name,
json,
} => {
let session = session_for(connection, session_id).await?;
let server_handle = session.register(&client_name).await?;
print_handle("serverHandle", server_handle, json);
}
Command::AddItem {
connection,
session_id,
server_handle,
item,
json,
} => {
let session = session_for(connection, session_id).await?;
let item_handle = session.add_item(server_handle, &item).await?;
print_handle("itemHandle", item_handle, json);
}
Command::Advise {
connection,
session_id,
server_handle,
item_handle,
json,
} => {
let session = session_for(connection, session_id).await?;
session.advise(server_handle, item_handle).await?;
print_ok("advise", json);
}
Command::StreamEvents {
connection,
session_id,
after_worker_sequence,
max_events,
json,
} => {
let client = connect(connection).await?;
let mut stream = client
.stream_events(StreamEventsRequest {
session_id,
after_worker_sequence,
})
.await?;
let mut events = Vec::new();
while events.len() < max_events {
let Some(event) = stream.next().await else {
break;
};
events.push(event?);
}
if json {
println!("{}", json!({ "eventCount": events.len() }));
} else {
for event in events {
println!("{} {}", event.worker_sequence, event.family);
}
}
}
Command::Write {
connection,
session_id,
server_handle,
item_handle,
value_type,
value,
user_id,
json,
} => {
let session = session_for(connection, session_id).await?;
session
.write(
server_handle,
item_handle,
parse_value(value_type, &value)?,
user_id,
)
.await?;
print_ok("write", json);
}
Command::Write2 {
connection,
session_id,
server_handle,
item_handle,
value_type,
value,
timestamp,
user_id,
json,
} => {
let session = session_for(connection, session_id).await?;
session
.write2(
server_handle,
item_handle,
parse_value(value_type, &value)?,
MxValue::string(timestamp),
user_id,
)
.await?;
print_ok("write2", json);
}
Command::Smoke {
connection,
item,
client_name,
json,
} => {
let client = connect(connection).await?;
let session = client
.open_session(OpenSessionRequest {
client_session_name: client_name.clone(),
..OpenSessionRequest::default()
})
.await?;
let result = async {
let server_handle = session.register(&client_name).await?;
let item_handle = session.add_item(server_handle, &item).await?;
session.advise(server_handle, item_handle).await?;
Ok::<_, Error>((server_handle, item_handle))
}
.await;
let close_result = session.close().await;
let (server_handle, item_handle) = result?;
close_result?;
if json {
println!(
"{}",
json!({
"sessionId": session.id(),
"serverHandle": server_handle,
"itemHandle": item_handle,
"closed": true,
})
);
} else {
println!(
"session {} registered server {server_handle}, item {item_handle}, closed",
session.id()
);
}
}
}
Ok(())
}
async fn connect(connection: ConnectionArgs) -> Result<GatewayClient, Error> {
GatewayClient::connect(connection.options()).await
}
async fn session_for(
connection: ConnectionArgs,
session_id: String,
) -> Result<mxgateway_client::Session, Error> {
let client = connect(connection).await?;
Ok(client.session(session_id))
}
fn print_version(use_json: bool) {
if use_json {
println!(
"{}",
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
);
println!("{}", version_json());
return;
}
@@ -50,6 +460,73 @@ fn print_version(use_json: bool) {
println!("worker protocol {WORKER_PROTOCOL_VERSION}");
}
fn version_json() -> Value {
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
}
fn print_command_reply(
operation: &str,
reply: &mxgateway_client::generated::mxaccess_gateway::v1::MxCommandReply,
use_json: bool,
) {
if use_json {
println!(
"{}",
json!({
"operation": operation,
"sessionId": reply.session_id,
"correlationId": reply.correlation_id,
"kind": reply.kind,
})
);
} else {
println!("{operation} completed");
}
}
fn print_handle(name: &str, handle: i32, use_json: bool) {
if use_json {
println!("{}", json!({ name: handle }));
} else {
println!("{handle}");
}
}
fn print_ok(operation: &str, use_json: bool) {
if use_json {
println!("{}", json!({ "operation": operation, "ok": true }));
} else {
println!("{operation} completed");
}
}
fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error> {
let parsed = match value_type {
CliValueType::Bool => MxValue::bool(parse_cli_value(value)?),
CliValueType::Int32 => MxValue::int32(parse_cli_value(value)?),
CliValueType::Int64 => MxValue::int64(parse_cli_value(value)?),
CliValueType::Float => MxValue::float(parse_cli_value(value)?),
CliValueType::Double => MxValue::double(parse_cli_value(value)?),
CliValueType::String => MxValue::string(value),
};
Ok(parsed)
}
fn parse_cli_value<T>(value: &str) -> Result<T, Error>
where
T: std::str::FromStr,
T::Err: std::fmt::Display,
{
value.parse::<T>().map_err(|source| Error::InvalidArgument {
name: "value".to_owned(),
detail: source.to_string(),
})
}
#[cfg(test)]
mod tests {
use clap::Parser;
@@ -61,4 +538,31 @@ mod tests {
let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]);
assert!(parsed.is_ok());
}
#[test]
fn parses_write_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"write",
"--session-id",
"session-1",
"--server-handle",
"12",
"--item-handle",
"34",
"--value-type",
"int32",
"--value",
"123",
]);
assert!(parsed.is_ok());
}
#[test]
fn version_json_output_has_protocol_versions() {
let value = super::version_json();
assert_eq!(value["gatewayProtocolVersion"], 1);
assert_eq!(value["workerProtocolVersion"], 1);
}
}
+57
View File
@@ -1,5 +1,9 @@
use std::fmt;
use tonic::metadata::MetadataValue;
use tonic::service::Interceptor;
use tonic::{Request, Status};
/// API key wrapper that avoids exposing raw credentials in formatted output.
#[derive(Clone, Eq, PartialEq)]
pub struct ApiKey(String);
@@ -28,3 +32,56 @@ impl fmt::Display for ApiKey {
formatter.write_str("<redacted>")
}
}
/// `tonic` interceptor that attaches gateway API key metadata.
#[derive(Clone, Debug, Default)]
pub struct AuthInterceptor {
api_key: Option<ApiKey>,
}
impl AuthInterceptor {
pub fn new(api_key: Option<ApiKey>) -> Self {
Self { api_key }
}
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(api_key) = &self.api_key {
let header_value = format!("Bearer {}", api_key.expose_secret())
.parse::<MetadataValue<_>>()
.map_err(|_| Status::unauthenticated("invalid API key metadata"))?;
request.metadata_mut().insert("authorization", header_value);
}
Ok(request)
}
}
#[cfg(test)]
mod tests {
use tonic::service::Interceptor;
use tonic::Request;
use super::{ApiKey, AuthInterceptor};
#[test]
fn api_key_debug_is_redacted() {
let key = ApiKey::new("mxgw_visible_secret");
assert_eq!(format!("{key:?}"), "ApiKey(\"<redacted>\")");
assert!(!format!("{key:?}").contains("visible_secret"));
assert_eq!(key.to_string(), "<redacted>");
}
#[test]
fn interceptor_attaches_bearer_metadata() {
let mut interceptor = AuthInterceptor::new(Some(ApiKey::new("mxgw_fixture_secret")));
let request = interceptor.call(Request::new(())).unwrap();
assert_eq!(
request.metadata().get("authorization").unwrap(),
"Bearer mxgw_fixture_secret"
);
}
}
+103 -10
View File
@@ -1,30 +1,123 @@
use tonic::transport::Channel;
use std::fs;
use crate::error::Error;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::auth::AuthInterceptor;
use crate::error::{ensure_command_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::generated::mxaccess_gateway::v1::{
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
OpenSessionReply, OpenSessionRequest, StreamEventsRequest,
};
use crate::options::ClientOptions;
use crate::session::Session;
pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, AuthInterceptor>>;
pub type EventStream =
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
/// Thin owner for the generated gateway client.
#[derive(Clone)]
pub struct GatewayClient {
inner: MxAccessGatewayClient<Channel>,
inner: RawGatewayClient,
call_timeout: std::time::Duration,
}
impl GatewayClient {
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
let mut endpoint =
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
endpoint = endpoint.connect_timeout(options.connect_timeout());
if !options.plaintext() {
let mut tls = ClientTlsConfig::new();
if let Some(server_name) = options.server_name_override() {
tls = tls.domain_name(server_name.to_owned());
}
})?;
if let Some(ca_file) = options.ca_file() {
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
})?;
tls = tls.ca_certificate(Certificate::from_pem(certificate));
}
endpoint = endpoint.tls_config(tls)?;
}
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
Ok(Self {
inner: MxAccessGatewayClient::new(channel),
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor),
call_timeout: options.call_timeout(),
})
}
pub fn into_inner(self) -> MxAccessGatewayClient<Channel> {
pub fn raw_client(&mut self) -> &mut RawGatewayClient {
&mut self.inner
}
pub fn into_inner(self) -> RawGatewayClient {
self.inner
}
pub fn session(&self, session_id: impl Into<String>) -> Session {
Session::new(session_id, self.clone())
}
pub async fn open_session_raw(
&self,
request: OpenSessionRequest,
) -> Result<OpenSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.open_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn open_session(&self, request: OpenSessionRequest) -> Result<Session, Error> {
let reply = self.open_session_raw(request).await?;
Ok(Session::new(reply.session_id, self.clone()))
}
pub async fn close_session_raw(
&self,
request: CloseSessionRequest,
) -> Result<CloseSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.close_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
let mut client = self.inner.clone();
let response = client.invoke(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
ensure_command_success(self.invoke_raw(request).await?)
}
pub async fn stream_events(&self, request: StreamEventsRequest) -> Result<EventStream, Error> {
let mut client = self.inner.clone();
let response = client.stream_events(self.unary_request(request)).await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
fn unary_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
request.set_timeout(self.call_timeout);
request
}
}
+149 -1
View File
@@ -1,13 +1,161 @@
use thiserror::Error as ThisError;
use tonic::Code;
use crate::generated::mxaccess_gateway::v1::{MxCommandReply, ProtocolStatusCode};
#[derive(Debug, ThisError)]
pub enum Error {
#[error("invalid gateway endpoint `{endpoint}`: {detail}")]
InvalidEndpoint { endpoint: String, detail: String },
#[error("invalid argument `{name}`: {detail}")]
InvalidArgument { name: String, detail: String },
#[error("gateway transport error: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("authentication failed: {message}")]
Authentication {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("authorization failed: {message}")]
Authorization {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway call timed out: {message}")]
Timeout {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway call cancelled: {message}")]
Cancelled {
message: String,
#[source]
status: Box<tonic::Status>,
},
#[error("gateway status error: {0}")]
Status(#[from] tonic::Status),
Status(Box<tonic::Status>),
#[error("gateway command failed: {0}")]
Command(#[from] Box<CommandError>),
}
#[derive(Clone, Debug)]
pub struct CommandError {
reply: MxCommandReply,
}
impl CommandError {
pub fn new(reply: MxCommandReply) -> Self {
Self { reply }
}
pub fn reply(&self) -> &MxCommandReply {
&self.reply
}
pub fn into_reply(self) -> MxCommandReply {
self.reply
}
}
impl std::fmt::Display for CommandError {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = self.reply.protocol_status.as_ref();
let code = status
.and_then(|status| ProtocolStatusCode::try_from(status.code).ok())
.unwrap_or(ProtocolStatusCode::Unspecified);
let message = status.map(|status| status.message.as_str()).unwrap_or("");
if message.is_empty() {
write!(formatter, "{code:?}")
} else {
write!(formatter, "{code:?}: {message}")
}
}
}
impl std::error::Error for CommandError {}
impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
let message = redact_credentials(status.message());
match status.code() {
Code::Unauthenticated => Self::Authentication {
message,
status: Box::new(status),
},
Code::PermissionDenied => Self::Authorization {
message,
status: Box::new(status),
},
Code::DeadlineExceeded => Self::Timeout {
message,
status: Box::new(status),
},
Code::Cancelled => Self::Cancelled {
message,
status: Box::new(status),
},
_ => Self::Status(Box::new(status)),
}
}
}
pub fn ensure_command_success(reply: MxCommandReply) -> Result<MxCommandReply, Error> {
let code = reply
.protocol_status
.as_ref()
.and_then(|status| ProtocolStatusCode::try_from(status.code).ok())
.unwrap_or(ProtocolStatusCode::Unspecified);
if code == ProtocolStatusCode::Ok {
Ok(reply)
} else {
Err(Box::new(CommandError::new(reply)).into())
}
}
fn redact_credentials(message: &str) -> String {
message
.split_whitespace()
.map(|part| {
if part.starts_with("mxgw_") || part.eq_ignore_ascii_case("bearer") {
"<redacted>"
} else {
part
}
})
.collect::<Vec<_>>()
.join(" ")
}
#[cfg(test)]
mod tests {
use tonic::{Code, Status};
use super::Error;
#[test]
fn classifies_authentication_status() {
let error = Error::from(Status::new(
Code::Unauthenticated,
"invalid API key mxgw_visible_secret",
));
let message = error.to_string();
assert!(matches!(error, Error::Authentication { .. }));
assert!(message.contains("<redacted>"));
assert!(!message.contains("visible_secret"));
}
}
+4 -3
View File
@@ -13,9 +13,10 @@ pub mod session;
pub mod value;
pub mod version;
pub use auth::ApiKey;
pub use client::GatewayClient;
pub use error::Error;
pub use auth::{ApiKey, AuthInterceptor};
pub use client::{EventStream, GatewayClient};
pub use error::{CommandError, Error};
pub use options::ClientOptions;
pub use session::Session;
pub use value::{MxArrayProjection, MxArrayValue, MxStatus, MxValue, MxValueProjection};
pub use version::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
+72
View File
@@ -1,4 +1,6 @@
use std::fmt;
use std::path::PathBuf;
use std::time::Duration;
use crate::auth::ApiKey;
@@ -7,6 +9,10 @@ pub struct ClientOptions {
endpoint: String,
api_key: Option<ApiKey>,
plaintext: bool,
ca_file: Option<PathBuf>,
server_name_override: Option<String>,
connect_timeout: Duration,
call_timeout: Duration,
}
impl ClientOptions {
@@ -15,6 +21,10 @@ impl ClientOptions {
endpoint: endpoint.into(),
api_key: None,
plaintext: true,
ca_file: None,
server_name_override: None,
connect_timeout: Duration::from_secs(10),
call_timeout: Duration::from_secs(30),
}
}
@@ -23,6 +33,31 @@ impl ClientOptions {
self
}
pub fn with_plaintext(mut self, plaintext: bool) -> Self {
self.plaintext = plaintext;
self
}
pub fn with_ca_file(mut self, ca_file: impl Into<PathBuf>) -> Self {
self.ca_file = Some(ca_file.into());
self
}
pub fn with_server_name_override(mut self, server_name_override: impl Into<String>) -> Self {
self.server_name_override = Some(server_name_override.into());
self
}
pub fn with_connect_timeout(mut self, connect_timeout: Duration) -> Self {
self.connect_timeout = connect_timeout;
self
}
pub fn with_call_timeout(mut self, call_timeout: Duration) -> Self {
self.call_timeout = call_timeout;
self
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
@@ -34,6 +69,22 @@ impl ClientOptions {
pub fn plaintext(&self) -> bool {
self.plaintext
}
pub fn ca_file(&self) -> Option<&PathBuf> {
self.ca_file.as_ref()
}
pub fn server_name_override(&self) -> Option<&str> {
self.server_name_override.as_deref()
}
pub fn connect_timeout(&self) -> Duration {
self.connect_timeout
}
pub fn call_timeout(&self) -> Duration {
self.call_timeout
}
}
impl Default for ClientOptions {
@@ -49,6 +100,27 @@ impl fmt::Debug for ClientOptions {
.field("endpoint", &self.endpoint)
.field("api_key", &self.api_key.as_ref().map(|_| "<redacted>"))
.field("plaintext", &self.plaintext)
.field("ca_file", &self.ca_file)
.field("server_name_override", &self.server_name_override)
.field("connect_timeout", &self.connect_timeout)
.field("call_timeout", &self.call_timeout)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::ClientOptions;
use crate::auth::ApiKey;
#[test]
fn debug_redacts_api_key() {
let options =
ClientOptions::new("http://localhost:5000").with_api_key(ApiKey::new("mxgw_secret"));
let debug = format!("{options:?}");
assert!(debug.contains("<redacted>"));
assert!(!debug.contains("mxgw_secret"));
}
}
+222 -3
View File
@@ -1,15 +1,234 @@
use crate::client::{EventStream, GatewayClient};
use crate::error::Error;
use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
use crate::generated::mxaccess_gateway::v1::mx_command_reply;
use crate::generated::mxaccess_gateway::v1::{
AddItem2Command, AddItemCommand, AdviseCommand, CloseSessionRequest, MxCommand, MxCommandKind,
MxCommandReply, MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand,
StreamEventsRequest, Write2Command, WriteCommand,
};
use crate::value::MxValue;
/// Session identifier returned by the gateway.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone)]
pub struct Session {
id: String,
client: GatewayClient,
}
impl Session {
pub fn new(id: impl Into<String>) -> Self {
Self { id: id.into() }
pub(crate) fn new(id: impl Into<String>, client: GatewayClient) -> Self {
Self {
id: id.into(),
client,
}
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn open(client: GatewayClient, client_session_name: &str) -> Result<Self, Error> {
client
.open_session(OpenSessionRequest {
client_session_name: client_session_name.to_owned(),
..OpenSessionRequest::default()
})
.await
}
pub async fn close(&self) -> Result<(), Error> {
self.client
.close_session_raw(CloseSessionRequest {
session_id: self.id.clone(),
client_correlation_id: "rust-client-close-session".to_owned(),
})
.await?;
Ok(())
}
pub async fn register(&self, client_name: &str) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::Register,
Payload::Register(RegisterCommand {
client_name: client_name.to_owned(),
}),
)
.await?;
Ok(register_server_handle(&reply))
}
pub async fn add_item(&self, server_handle: i32, item_definition: &str) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::AddItem,
Payload::AddItem(AddItemCommand {
server_handle,
item_definition: item_definition.to_owned(),
}),
)
.await?;
Ok(add_item_handle(&reply))
}
pub async fn add_item2(
&self,
server_handle: i32,
item_definition: &str,
item_context: &str,
) -> Result<i32, Error> {
let reply = self
.invoke(
MxCommandKind::AddItem2,
Payload::AddItem2(AddItem2Command {
server_handle,
item_definition: item_definition.to_owned(),
item_context: item_context.to_owned(),
}),
)
.await?;
Ok(add_item2_handle(&reply))
}
pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
self.invoke(
MxCommandKind::Advise,
Payload::Advise(AdviseCommand {
server_handle,
item_handle,
}),
)
.await?;
Ok(())
}
pub async fn write(
&self,
server_handle: i32,
item_handle: i32,
value: MxValue,
user_id: i32,
) -> Result<(), Error> {
self.invoke(
MxCommandKind::Write,
Payload::Write(WriteCommand {
server_handle,
item_handle,
value: Some(value.into_proto()),
user_id,
}),
)
.await?;
Ok(())
}
pub async fn write2(
&self,
server_handle: i32,
item_handle: i32,
value: MxValue,
timestamp_value: MxValue,
user_id: i32,
) -> Result<(), Error> {
self.invoke(
MxCommandKind::Write2,
Payload::Write2(Write2Command {
server_handle,
item_handle,
value: Some(value.into_proto()),
timestamp_value: Some(timestamp_value.into_proto()),
user_id,
}),
)
.await?;
Ok(())
}
pub async fn events(&self) -> Result<EventStream, Error> {
self.events_after(0).await
}
pub async fn events_after(&self, after_worker_sequence: u64) -> Result<EventStream, Error> {
self.client
.stream_events(StreamEventsRequest {
session_id: self.id.clone(),
after_worker_sequence,
})
.await
}
pub async fn invoke_raw(
&self,
kind: MxCommandKind,
payload: Payload,
) -> Result<MxCommandReply, Error> {
self.client
.invoke_raw(self.command_request(kind, payload))
.await
}
pub async fn invoke(
&self,
kind: MxCommandKind,
payload: Payload,
) -> Result<MxCommandReply, Error> {
self.client
.invoke(self.command_request(kind, payload))
.await
}
fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest {
MxCommandRequest {
session_id: self.id.clone(),
client_correlation_id: format!("rust-client-{}", kind.as_str_name()),
command: Some(MxCommand {
kind: kind as i32,
payload: Some(payload),
}),
}
}
}
fn register_server_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::Register(register)) => register.server_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn add_item_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn add_item2_handle(reply: &MxCommandReply) -> i32 {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle,
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
}
}
fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> {
match value.kind.as_ref()? {
crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value),
_ => None,
}
}
+236 -6
View File
@@ -1,9 +1,239 @@
use crate::generated::mxaccess_gateway::v1::MxValue;
use crate::generated::mxaccess_gateway::v1::mx_array::Values;
use crate::generated::mxaccess_gateway::v1::mx_value::Kind;
use crate::generated::mxaccess_gateway::v1::{
BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, MxArray, MxDataType,
MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue as ProtoMxValue, RawArray,
StringArray, TimestampArray,
};
pub fn int32_value(value: i32) -> MxValue {
MxValue {
data_type: crate::generated::mxaccess_gateway::v1::MxDataType::Integer as i32,
kind: Some(crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value)),
..MxValue::default()
#[derive(Clone, Debug, PartialEq)]
pub struct MxValue {
raw: ProtoMxValue,
projection: MxValueProjection,
}
impl MxValue {
pub fn from_proto(raw: ProtoMxValue) -> Self {
let projection = MxValueProjection::from_proto(&raw);
Self { raw, projection }
}
pub fn bool(value: bool) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Boolean as i32,
variant_type: "VT_BOOL".to_owned(),
kind: Some(Kind::BoolValue(value)),
..ProtoMxValue::default()
})
}
pub fn int32(value: i32) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I4".to_owned(),
kind: Some(Kind::Int32Value(value)),
..ProtoMxValue::default()
})
}
pub fn int64(value: i64) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I8".to_owned(),
kind: Some(Kind::Int64Value(value)),
..ProtoMxValue::default()
})
}
pub fn float(value: f32) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Float as i32,
variant_type: "VT_R4".to_owned(),
kind: Some(Kind::FloatValue(value)),
..ProtoMxValue::default()
})
}
pub fn double(value: f64) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::Double as i32,
variant_type: "VT_R8".to_owned(),
kind: Some(Kind::DoubleValue(value)),
..ProtoMxValue::default()
})
}
pub fn string(value: impl Into<String>) -> Self {
Self::from_proto(ProtoMxValue {
data_type: MxDataType::String as i32,
variant_type: "VT_BSTR".to_owned(),
kind: Some(Kind::StringValue(value.into())),
..ProtoMxValue::default()
})
}
pub fn raw(&self) -> &ProtoMxValue {
&self.raw
}
pub fn projection(&self) -> &MxValueProjection {
&self.projection
}
pub fn into_proto(self) -> ProtoMxValue {
self.raw
}
}
impl From<MxValue> for ProtoMxValue {
fn from(value: MxValue) -> Self {
value.into_proto()
}
}
impl From<ProtoMxValue> for MxValue {
fn from(value: ProtoMxValue) -> Self {
Self::from_proto(value)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum MxValueProjection {
Unset,
Null,
Bool(bool),
Int32(i32),
Int64(i64),
Float(f32),
Double(f64),
String(String),
Timestamp(prost_types::Timestamp),
Array(MxArrayValue),
Raw(Vec<u8>),
}
impl MxValueProjection {
fn from_proto(value: &ProtoMxValue) -> Self {
if value.is_null {
return Self::Null;
}
match value.kind.as_ref() {
Some(Kind::BoolValue(value)) => Self::Bool(*value),
Some(Kind::Int32Value(value)) => Self::Int32(*value),
Some(Kind::Int64Value(value)) => Self::Int64(*value),
Some(Kind::FloatValue(value)) => Self::Float(*value),
Some(Kind::DoubleValue(value)) => Self::Double(*value),
Some(Kind::StringValue(value)) => Self::String(value.clone()),
Some(Kind::TimestampValue(value)) => Self::Timestamp(*value),
Some(Kind::ArrayValue(value)) => Self::Array(MxArrayValue::from_proto(value.clone())),
Some(Kind::RawValue(value)) => Self::Raw(value.clone()),
None => Self::Unset,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MxArrayValue {
raw: MxArray,
projection: MxArrayProjection,
}
impl MxArrayValue {
pub fn from_proto(raw: MxArray) -> Self {
let projection = MxArrayProjection::from_proto(&raw);
Self { raw, projection }
}
pub fn string(values: Vec<String>) -> Self {
Self::from_proto(MxArray {
element_data_type: MxDataType::String as i32,
variant_type: "VT_ARRAY|VT_BSTR".to_owned(),
dimensions: vec![values.len() as u32],
values: Some(Values::StringValues(StringArray { values })),
..MxArray::default()
})
}
pub fn raw(&self) -> &MxArray {
&self.raw
}
pub fn projection(&self) -> &MxArrayProjection {
&self.projection
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum MxArrayProjection {
Unset,
Bool(Vec<bool>),
Int32(Vec<i32>),
Int64(Vec<i64>),
Float(Vec<f32>),
Double(Vec<f64>),
String(Vec<String>),
Timestamp(Vec<prost_types::Timestamp>),
Raw(Vec<Vec<u8>>),
}
impl MxArrayProjection {
fn from_proto(array: &MxArray) -> Self {
match array.values.as_ref() {
Some(Values::BoolValues(BoolArray { values })) => Self::Bool(values.clone()),
Some(Values::Int32Values(Int32Array { values })) => Self::Int32(values.clone()),
Some(Values::Int64Values(Int64Array { values })) => Self::Int64(values.clone()),
Some(Values::FloatValues(FloatArray { values })) => Self::Float(values.clone()),
Some(Values::DoubleValues(DoubleArray { values })) => Self::Double(values.clone()),
Some(Values::StringValues(StringArray { values })) => Self::String(values.clone()),
Some(Values::TimestampValues(TimestampArray { values })) => {
Self::Timestamp(values.clone())
}
Some(Values::RawValues(RawArray { values })) => Self::Raw(values.clone()),
None => Self::Unset,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MxStatus {
raw: MxStatusProxy,
}
impl MxStatus {
pub fn from_proto(raw: MxStatusProxy) -> Self {
Self { raw }
}
pub fn raw(&self) -> &MxStatusProxy {
&self.raw
}
pub fn success(&self) -> i32 {
self.raw.success
}
pub fn category(&self) -> Option<MxStatusCategory> {
MxStatusCategory::try_from(self.raw.category).ok()
}
pub fn detected_by(&self) -> Option<MxStatusSource> {
MxStatusSource::try_from(self.raw.detected_by).ok()
}
pub fn detail(&self) -> i32 {
self.raw.detail
}
pub fn raw_category(&self) -> i32 {
self.raw.raw_category
}
pub fn raw_detected_by(&self) -> i32 {
self.raw.raw_detected_by
}
pub fn diagnostic_text(&self) -> &str {
&self.raw.diagnostic_text
}
}
+398
View File
@@ -0,0 +1,398 @@
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
MxAccessGateway, MxAccessGatewayServer,
};
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use mxgateway_client::generated::mxaccess_gateway::v1::{
AddItemReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply,
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState,
StreamEventsRequest,
};
use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
MxValueProjection,
};
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
#[tokio::test]
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let mut client = GatewayClient::connect(
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")),
)
.await
.unwrap();
let _raw = client.raw_client();
let session = client
.open_session(OpenSessionRequest {
client_session_name: "rust-test".to_owned(),
..OpenSessionRequest::default()
})
.await
.unwrap();
assert_eq!(session.id(), "session-fixture");
assert_eq!(
state.authorization.lock().await.as_deref(),
Some("Bearer mxgw_fixture_secret")
);
}
#[tokio::test]
async fn session_helpers_build_commands_and_preserve_command_errors() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap();
assert_eq!(item_handle, 34);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32));
drop(last_command);
let error = session
.write(12, 34, ClientMxValue::int32(123), 0)
.await
.unwrap_err();
let Error::Command(error) = error else {
panic!("write failure should preserve the raw command reply: {error:?}");
};
assert_eq!(
error.reply().protocol_status.as_ref().unwrap().code,
ProtocolStatusCode::MxaccessFailure as i32
);
assert_eq!(error.reply().hresult, Some(-2147220992));
assert_eq!(error.reply().statuses.len(), 2);
}
#[tokio::test]
async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_events(StreamEventsRequest {
session_id: "session-fixture".to_owned(),
after_worker_sequence: 0,
})
.await
.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
drop(stream);
for _ in 0..20 {
if state.stream_dropped.load(Ordering::SeqCst) {
return;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(state.stream_dropped.load(Ordering::SeqCst));
}
#[test]
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
let fixture = behavior_fixture("values/value-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let int64_case = case_by_id(cases, "int64.large");
let int64_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I8".to_owned(),
kind: Some(Kind::Int64Value(
int64_case["value"]["int64Value"]
.as_str()
.unwrap()
.parse()
.unwrap(),
)),
..MxValue::default()
});
assert_eq!(
int64_value.projection(),
&MxValueProjection::Int64(9_223_372_036_854_770_000)
);
let raw_case = case_by_id(cases, "raw-fallback.variant");
let raw_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Unknown as i32,
variant_type: "VT_RECORD".to_owned(),
raw_diagnostic: raw_case["value"]["rawDiagnostic"]
.as_str()
.unwrap()
.to_owned(),
raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32,
kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])),
..MxValue::default()
});
assert_eq!(
raw_value.projection(),
&MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
);
assert_eq!(raw_value.raw().raw_data_type, 32767);
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
}
#[test]
fn status_conversion_fixtures_preserve_raw_fields() {
let fixture = behavior_fixture("statuses/status-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let raw_case = case_by_id(cases, "raw-unknown-category");
let status = MxStatus::from_proto(MxStatusProxy {
success: raw_case["status"]["success"].as_i64().unwrap() as i32,
category: MxStatusCategory::Unknown as i32,
detected_by: MxStatusSource::Unknown as i32,
detail: raw_case["status"]["detail"].as_i64().unwrap() as i32,
raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32,
raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32,
diagnostic_text: raw_case["status"]["diagnosticText"]
.as_str()
.unwrap()
.to_owned(),
});
assert_eq!(status.success(), 0);
assert_eq!(status.category(), Some(MxStatusCategory::Unknown));
assert_eq!(status.raw_category(), 99);
assert_eq!(status.raw_detected_by(), 77);
assert!(status.diagnostic_text().contains("preserved"));
}
#[test]
fn authentication_and_authorization_statuses_are_distinct_and_redacted() {
let auth = Error::from(Status::unauthenticated(
"invalid API key mxgw_visible_secret",
));
let denied = Error::from(Status::permission_denied("missing scope mxaccess.write"));
assert!(matches!(auth, Error::Authentication { .. }));
assert!(matches!(denied, Error::Authorization { .. }));
assert!(!auth.to_string().contains("visible_secret"));
}
#[test]
fn command_error_display_keeps_raw_reply_accessible() {
let reply = mxaccess_failure_reply();
let error = CommandError::new(reply.clone());
assert_eq!(error.reply().hresult, Some(-2147220992));
assert!(error.to_string().contains("MxaccessFailure"));
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
last_command_kind: Mutex<Option<i32>>,
stream_dropped: Arc<AtomicBool>,
}
#[derive(Clone)]
struct FakeGateway {
state: Arc<FakeState>,
}
#[tonic::async_trait]
impl MxAccessGateway for FakeGateway {
async fn open_session(
&self,
request: Request<OpenSessionRequest>,
) -> Result<Response<OpenSessionReply>, Status> {
*self.state.authorization.lock().await = request
.metadata()
.get("authorization")
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
Ok(Response::new(OpenSessionReply {
session_id: "session-fixture".to_owned(),
backend_name: "fake".to_owned(),
worker_process_id: 1234,
worker_protocol_version: 1,
gateway_protocol_version: 1,
protocol_status: Some(ok_status("opened")),
..OpenSessionReply::default()
}))
}
async fn close_session(
&self,
request: Request<CloseSessionRequest>,
) -> Result<Response<CloseSessionReply>, Status> {
Ok(Response::new(CloseSessionReply {
session_id: request.into_inner().session_id,
final_state: SessionState::Closed as i32,
protocol_status: Some(ok_status("closed")),
}))
}
async fn invoke(
&self,
request: Request<mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest>,
) -> Result<Response<MxCommandReply>, Status> {
let request = request.into_inner();
let kind = request
.command
.as_ref()
.map(|command| command.kind)
.unwrap_or_default();
*self.state.last_command_kind.lock().await = Some(kind);
if kind == MxCommandKind::Write as i32 {
return Ok(Response::new(mxaccess_failure_reply()));
}
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 34,
})),
..MxCommandReply::default()
}))
}
type StreamEventsStream = DropAwareStream;
async fn stream_events(
&self,
_request: Request<StreamEventsRequest>,
) -> Result<Response<Self::StreamEventsStream>, Status> {
let (sender, receiver) = mpsc::channel(4);
sender.send(Ok(event(1))).await.unwrap();
sender.send(Ok(event(2))).await.unwrap();
Ok(Response::new(DropAwareStream {
inner: ReceiverStream::new(receiver),
dropped: self.state.stream_dropped.clone(),
}))
}
}
struct DropAwareStream {
inner: ReceiverStream<Result<MxEvent, Status>>,
dropped: Arc<AtomicBool>,
}
impl Stream for DropAwareStream {
type Item = Result<MxEvent, Status>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(context)
}
}
impl Drop for DropAwareStream {
fn drop(&mut self) {
self.dropped.store(true, Ordering::SeqCst);
}
}
async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let incoming = TcpListenerStream::new(listener);
let service = MxAccessGatewayServer::new(FakeGateway { state });
tokio::spawn(async move {
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
.unwrap();
});
format!("http://{address}")
}
fn ok_status(message: &str) -> ProtocolStatus {
ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,
message: message.to_owned(),
}
}
fn mxaccess_failure_reply() -> MxCommandReply {
MxCommandReply {
session_id: "session-fixture".to_owned(),
correlation_id: "gateway-correlation-write-1".to_owned(),
kind: MxCommandKind::Write as i32,
protocol_status: Some(ProtocolStatus {
code: ProtocolStatusCode::MxaccessFailure as i32,
message: "MXAccess rejected the write.".to_owned(),
}),
hresult: Some(-2147220992),
statuses: vec![
MxStatusProxy {
success: 0,
category: MxStatusCategory::SecurityError as i32,
detected_by: MxStatusSource::RespondingLmx as i32,
detail: 321,
raw_category: 8,
raw_detected_by: 3,
diagnostic_text: "Write denied by provider security.".to_owned(),
},
MxStatusProxy {
success: 0,
category: MxStatusCategory::OperationalError as i32,
detected_by: MxStatusSource::RespondingNmx as i32,
detail: 902,
raw_category: 7,
raw_detected_by: 5,
diagnostic_text: "Provider rejected the item state.".to_owned(),
},
],
..MxCommandReply::default()
}
}
fn event(sequence: u64) -> MxEvent {
MxEvent {
family: MxEventFamily::OnDataChange as i32,
session_id: "session-fixture".to_owned(),
worker_sequence: sequence,
..MxEvent::default()
}
}
fn behavior_fixture(path: &str) -> Value {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../proto/fixtures/behavior")
.join(path);
let data = std::fs::read_to_string(&path).unwrap();
serde_json::from_str(&data).unwrap()
}
fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value {
cases
.iter()
.find(|case| case["id"].as_str() == Some(id))
.unwrap_or_else(|| panic!("missing fixture case {id}"))
}
+8
View File
@@ -76,6 +76,13 @@ stdout/stderr lines emitted during the run.
## Focused Commands
Run the parity fixture matrix tests after changing the integration parity
scenario list:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
```
Run the fake worker tests after changing gateway worker IPC, session startup, or
event streaming behavior:
@@ -95,6 +102,7 @@ dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj
## Related Documentation
- [Parity Fixture Matrix](./ParityFixtureMatrix.md)
- [Gateway Process Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
+102
View File
@@ -0,0 +1,102 @@
# Parity Fixture Matrix
The parity fixture matrix defines the live-test scenarios used to compare
direct MXAccess behavior with the gateway-backed worker. It is a planning and
validation fixture, not a source of synthetic MXAccess behavior.
The matrix lives in
`clients/proto/fixtures/parity/parity-fixture-matrix.json`. It references the
local MXAccess capture set under
`C:/Users/dohertj2/Desktop/mxaccess/captures` and keeps capture paths relative
to that root so the repository does not copy raw capture artifacts.
## Scope
The matrix covers every public `LMXProxyServerClass` method represented by the
gateway contract:
- `Register`
- `Unregister`
- `AddItem`
- `AddItem2`
- `RemoveItem`
- `Advise`
- `UnAdvise`
- `AdviseSupervisory`
- `AddBufferedItem`
- `SetBufferedUpdateInterval`
- `Suspend`
- `Activate`
- `Write`
- `Write2`
- `WriteSecured`
- `WriteSecured2`
- `AuthenticateUser`
- `ArchestrAUserToId`
Each entry is either a `planned_fixture` or a `documented_gap`.
`WriteSecured` remains a documented gap because the current captures show
`0x80004021` before MXAccess emits a value-bearing write body.
`OperationComplete` and public `OnBufferedDataChange` batches also remain
documented gaps because no capture in the current set proves those public event
payloads from native MXAccess.
## Required Scenario Groups
The matrix pins the high-risk parity scenarios from the integration milestone:
| Scenario | Purpose |
|----------|---------|
| `invalid_handles` | Preserves invalid server, item, post-remove, and invalid-reference HRESULT/status behavior. |
| `write_statuses` | Compares successful writes, wrong-type writes, invalid references, arrays, and write-complete status arrays. |
| `secured_writes` | Covers observed `WriteSecured` rejection and authenticated `WriteSecured2` paths without logging credential-bearing values. |
| `add_item_context` | Ensures `AddItem2` and buffered registration pass context strings exactly as supplied. |
| `buffered_registration` | Tracks buffered item registration and interval setup separately from normal advice. |
## Comparison Format
Each live parity fixture should record one direct MXAccess result and one
gateway result for the same operation.
Direct MXAccess records include:
- method name,
- arguments after redaction,
- returned value,
- HRESULT,
- exception type,
- `MXSTATUS_PROXY[]` values,
- native event records in observed order.
Gateway records include:
- `MxCommandKind`,
- `ProtocolStatus`,
- `MxCommandReply.ReturnValue`,
- `MxCommandReply.Hresult`,
- repeated `MxCommandReply.Statuses`,
- safe diagnostic message,
- streamed `MxEvent` records in worker-sequence order.
Compare HRESULT, exception type, returned value, status array shape, raw status
fields, event family order, event payload shape, value projection, and raw
fallback metadata. The gateway must not convert an MXAccess command failure
into a transport failure when the worker captured HRESULT or status details.
## Validation
Run the parity fixture matrix tests after changing the matrix:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
```
Live MXAccess execution remains opt-in. The matrix defines which scenarios to
run when the installed MXAccess COM component and provider state are available;
normal unit tests only validate the repository fixture shape.
## Related Documentation
- [Gateway Testing](./GatewayTesting.md)
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
- [Protobuf Contracts](./Contracts.md)
+11 -3
View File
@@ -137,9 +137,17 @@ The Python scaffold provides a repo-local generation script:
clients/python/generate-proto.ps1
```
Java clients should use the Gradle protobuf plugin and write generated sources
under `clients/java/src/main/generated`. The Java client scaffold owns the
Gradle plugin versions and source-set wiring.
Java clients use the Gradle protobuf plugin from `clients/java`. The
`mxgateway-client` project reads the shared `.proto` files and writes generated
Java protobuf and gRPC sources under `clients/java/src/main/generated`, matching
the manifest output path. Handwritten client and CLI code stays in the
`mxgateway-client` and `mxgateway-cli` project source trees.
Run the Java workspace checks from `clients/java`:
```powershell
gradle test
```
## Golden Fixtures
+15
View File
@@ -17,6 +17,7 @@ Recommended Gradle multi-project layout:
clients/java/
settings.gradle
build.gradle
src/main/generated/
mxgateway-client/
build.gradle
src/main/java/com/dohertylan/mxgateway/client/
@@ -31,6 +32,7 @@ Alternative Maven layout is acceptable if the repo standardizes on Maven.
Target Java:
- Java 21 recommended.
- The Gradle scaffold uses the Java 21 toolchain for compilation and tests.
Expected dependencies:
@@ -189,3 +191,16 @@ Publish library and CLI separately:
Generated protobuf code should be produced during the build from shared proto
files and should not be hand-edited.
## Current Build
Run the Java scaffold checks from `clients/java`:
```powershell
gradle test
```
The `mxgateway-client` project generates the gateway and worker protobuf/gRPC
bindings into `src/main/generated`, compiles the generated contracts, and runs
JUnit 5 tests. The `mxgateway-cli` project builds a Picocli-based `mxgw-java`
entry point for later command implementation.
@@ -0,0 +1,293 @@
using System.Text.Json;
using MxGateway.Contracts;
namespace MxGateway.Tests.Contracts;
public sealed class ParityFixtureMatrixTests
{
[Fact]
public void Matrix_DeclaresCurrentProtocolVersionsAndComparisonFields()
{
using JsonDocument matrix = LoadParityMatrix();
JsonElement root = matrix.RootElement;
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
Assert.Equal("mxaccess-gateway-parity-fixture-matrix", root.GetProperty("fixtureSet").GetString());
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
JsonElement comparisonFormat = root.GetProperty("comparisonFormat");
AssertRequiredFields(
comparisonFormat.GetProperty("directMxAccess").GetProperty("requiredFields"),
"method",
"arguments",
"returnedValue",
"hresult",
"statuses",
"events");
AssertRequiredFields(
comparisonFormat.GetProperty("gatewayResult").GetProperty("requiredFields"),
"kind",
"protocolStatus",
"returnValue",
"hresult",
"statuses",
"events");
AssertRequiredFields(
comparisonFormat.GetProperty("eventFields"),
"family",
"value",
"quality",
"sourceTimestamp",
"statuses",
"workerSequence");
AssertRequiredFields(
comparisonFormat.GetProperty("comparisonKeys"),
"hresult",
"statusArrayShape",
"statusRawFields",
"eventFamilyOrder",
"eventPayloadShape",
"valueProjection",
"rawFallbackMetadata");
}
[Fact]
public void Matrix_CoversEveryPublicMxAccessMethod()
{
using JsonDocument matrix = LoadParityMatrix();
JsonElement methodFixtures = matrix.RootElement.GetProperty("methodFixtures");
Dictionary<string, JsonElement> fixturesByMethod = [];
HashSet<string> ids = new(StringComparer.Ordinal);
foreach (JsonElement fixture in methodFixtures.EnumerateArray())
{
string id = fixture.GetProperty("id").GetString()!;
string method = fixture.GetProperty("method").GetString()!;
string commandKind = fixture.GetProperty("commandKind").GetString()!;
string status = fixture.GetProperty("status").GetString()!;
Assert.True(ids.Add(id), $"Duplicate parity fixture id '{id}'.");
Assert.True(fixturesByMethod.TryAdd(method, fixture), $"Duplicate parity method '{method}'.");
Assert.StartsWith("MX_COMMAND_KIND_", commandKind, StringComparison.Ordinal);
Assert.Contains(status, KnownFixtureStatuses);
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
}
Assert.Equal(ExpectedPublicMethods.Order(StringComparer.Ordinal), fixturesByMethod.Keys.Order(StringComparer.Ordinal));
foreach (string method in ExpectedPublicMethods)
{
JsonElement fixture = fixturesByMethod[method];
string status = fixture.GetProperty("status").GetString()!;
Assert.True(
status == "planned_fixture" || status == "documented_gap",
$"Method '{method}' must have a planned parity fixture or documented gap.");
}
}
[Fact]
public void Matrix_CoversRequiredParityScenarioGroups()
{
using JsonDocument matrix = LoadParityMatrix();
HashSet<string> knownFixtureIds = GetFixtureIds(matrix.RootElement);
Dictionary<string, JsonElement> groupsById = [];
foreach (JsonElement group in matrix.RootElement.GetProperty("scenarioGroups").EnumerateArray())
{
string id = group.GetProperty("id").GetString()!;
Assert.True(groupsById.TryAdd(id, group), $"Duplicate parity scenario group '{id}'.");
Assert.NotEmpty(group.GetProperty("description").GetString()!);
Assert.NotEmpty(group.GetProperty("fixtureIds").EnumerateArray());
AssertCaptureReferencesAreRelative(group.GetProperty("captureReferences"));
foreach (JsonElement fixtureIdElement in group.GetProperty("fixtureIds").EnumerateArray())
{
string fixtureId = fixtureIdElement.GetString()!;
Assert.Contains(fixtureId, knownFixtureIds);
}
}
foreach (string requiredGroup in RequiredScenarioGroups)
{
Assert.True(groupsById.ContainsKey(requiredGroup), $"Missing required parity scenario group '{requiredGroup}'.");
}
AssertScenarioCovers(groupsById["invalid_handles"], "method.remove-item.basic", "method.write.value-status-matrix");
AssertScenarioCovers(groupsById["write_statuses"], "method.write.value-status-matrix", "event.on-write-complete.status");
AssertScenarioCovers(groupsById["secured_writes"], "method.write-secured.rejection-gap", "method.write-secured2.authenticated");
AssertScenarioCovers(groupsById["add_item_context"], "method.add-item2.context", "method.add-buffered-item.context");
AssertScenarioCovers(groupsById["buffered_registration"], "method.add-buffered-item.context", "event.on-buffered-data-change.batch-gap");
}
[Fact]
public void Matrix_CoversEveryPublicMxAccessEventFamily()
{
using JsonDocument matrix = LoadParityMatrix();
Dictionary<string, JsonElement> fixturesByFamily = [];
foreach (JsonElement fixture in matrix.RootElement.GetProperty("eventFixtures").EnumerateArray())
{
string family = fixture.GetProperty("family").GetString()!;
string status = fixture.GetProperty("status").GetString()!;
Assert.True(fixturesByFamily.TryAdd(family, fixture), $"Duplicate parity event family '{family}'.");
Assert.Contains(status, KnownFixtureStatuses);
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
}
foreach (string eventFamily in ExpectedEventFamilies)
{
Assert.True(fixturesByFamily.ContainsKey(eventFamily), $"Missing parity fixture for event family '{eventFamily}'.");
}
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_OPERATION_COMPLETE"].GetProperty("status").GetString());
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE"].GetProperty("status").GetString());
}
private static readonly string[] ExpectedPublicMethods =
[
"Register",
"Unregister",
"AddItem",
"AddItem2",
"RemoveItem",
"Advise",
"UnAdvise",
"AdviseSupervisory",
"AddBufferedItem",
"SetBufferedUpdateInterval",
"Suspend",
"Activate",
"Write",
"Write2",
"WriteSecured",
"WriteSecured2",
"AuthenticateUser",
"ArchestrAUserToId",
];
private static readonly string[] ExpectedEventFamilies =
[
"MX_EVENT_FAMILY_ON_DATA_CHANGE",
"MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"MX_EVENT_FAMILY_OPERATION_COMPLETE",
"MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
];
private static readonly string[] RequiredScenarioGroups =
[
"invalid_handles",
"write_statuses",
"secured_writes",
"add_item_context",
"buffered_registration",
];
private static readonly string[] KnownFixtureStatuses =
[
"planned_fixture",
"documented_gap",
];
private static void AssertRequiredFields(
JsonElement fields,
params string[] expectedFields)
{
HashSet<string> declared = fields
.EnumerateArray()
.Select(field => field.GetString()!)
.ToHashSet(StringComparer.Ordinal);
foreach (string expectedField in expectedFields)
{
Assert.Contains(expectedField, declared);
}
}
private static void AssertCaptureReferencesAreRelative(JsonElement captureReferences)
{
int count = 0;
foreach (JsonElement captureReference in captureReferences.EnumerateArray())
{
string path = captureReference.GetString()!;
Assert.StartsWith("captures/", path, StringComparison.Ordinal);
Assert.DoesNotContain("\\", path, StringComparison.Ordinal);
Assert.False(Path.IsPathRooted(path), $"Capture reference '{path}' must be relative.");
count++;
}
Assert.True(count > 0, "Each parity fixture must reference at least one MXAccess capture.");
}
private static void AssertScenarioCovers(
JsonElement group,
params string[] fixtureIds)
{
HashSet<string> declared = group
.GetProperty("fixtureIds")
.EnumerateArray()
.Select(fixtureId => fixtureId.GetString()!)
.ToHashSet(StringComparer.Ordinal);
foreach (string fixtureId in fixtureIds)
{
Assert.Contains(fixtureId, declared);
}
}
private static HashSet<string> GetFixtureIds(JsonElement root)
{
HashSet<string> ids = new(StringComparer.Ordinal);
foreach (JsonElement fixture in root.GetProperty("methodFixtures").EnumerateArray())
{
ids.Add(fixture.GetProperty("id").GetString()!);
}
foreach (JsonElement fixture in root.GetProperty("eventFixtures").EnumerateArray())
{
ids.Add(fixture.GetProperty("id").GetString()!);
}
return ids;
}
private static JsonDocument LoadParityMatrix()
{
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetParityFixtureRoot().FullName, "parity-fixture-matrix.json")));
}
private static DirectoryInfo GetParityFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "parity"));
}
private static DirectoryInfo FindRepositoryRoot()
{
DirectoryInfo? current = new(AppContext.BaseDirectory);
while (current is not null)
{
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
&& Directory.Exists(Path.Combine(current.FullName, "src"))
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
{
return current;
}
current = current.Parent;
}
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
}
}