Compare commits

...

7 Commits

Author SHA1 Message Date
Joseph Doherty d6939432f9 Issue #48: implement Java client session values errors and CLI 2026-04-26 20:59:28 -04:00
dohertj2 02143ef7e2 Merge pull request #98 from agent-2/issue-35-parity-fixture-matrix
Issue #35: add parity fixture matrix
2026-04-26 20:54:24 -04:00
dohertj2 c032852065 Merge pull request #97 from agent-3/issue-46-implement-python-async-client-values-errors-and-cli
Issue #46: implement Python async client values errors and CLI
2026-04-26 20:50:10 -04:00
Joseph Doherty 1d93e77234 Merge remote-tracking branch 'origin/main' into agent-2/issue-35-parity-fixture-matrix 2026-04-26 20:49:43 -04:00
Joseph Doherty 0a670eb381 Issue #35: add parity fixture matrix 2026-04-26 20:47:05 -04:00
Joseph Doherty b57662aae7 Issue #46: implement Python async client values errors and CLI 2026-04-26 20:46:18 -04:00
dohertj2 14afb325c3 Merge pull request #96 from agent-1/issue-47-scaffold-java-gradle-build
Issue #47: scaffold Java Gradle build
2026-04-26 20:42:39 -04:00
43 changed files with 5202 additions and 32 deletions
+60 -5
View File
@@ -1,8 +1,7 @@
# Java Client
The Java client workspace contains the Gradle scaffold for the MXAccess Gateway
client library, generated protobuf/gRPC bindings, a test CLI project, and JUnit
tests.
The Java client workspace contains the MXAccess Gateway client library,
generated protobuf/gRPC bindings, a Picocli test CLI project, and JUnit tests.
## Layout
@@ -20,8 +19,63 @@ clients/java/
generated sources under `src/main/generated`, which matches the client proto
manifest in `../proto/proto-inputs.json`. Do not edit generated files by hand.
`mxgateway-client` exposes `MxGatewayClientOptions`, `MxGatewayClient`,
`MxGatewaySession`, value/status helpers, typed gateway exceptions, raw
generated stubs, and generated protobuf messages for parity tests.
`mxgateway-cli` depends on `mxgateway-client` and provides the `mxgw-java`
application entry point used by later CLI implementation work.
application entry point. The CLI supports version, session, command, event
streaming, write, and smoke-test commands with deterministic JSON output.
## Client Usage
Create a client with explicit transport and auth options:
```java
MxGatewayClientOptions options = MxGatewayClientOptions.builder()
.endpoint("localhost:5000")
.apiKey(System.getenv("MXGATEWAY_API_KEY"))
.plaintext(true)
.build();
try (MxGatewayClient client = MxGatewayClient.connect(options);
MxGatewaySession session = client.openSession("java-client")) {
int serverHandle = session.register("java-client");
int itemHandle = session.addItem(serverHandle, "TestObject.TestInt");
session.advise(serverHandle, itemHandle);
session.write(serverHandle, itemHandle, MxValues.int32Value(123), 0);
}
```
Use `rawBlockingStub`, `rawFutureStub`, `rawAsyncStub`, `openSessionRaw`,
`closeSessionRaw`, `invoke`, and raw session helper methods when tests need the
underlying protobuf messages. `MxGatewayCommandException` and
`MxAccessException` preserve the raw `MxCommandReply` when the gateway returns a
data-bearing MXAccess failure.
`MxEventStream` implements `Iterator<MxEvent>` and `AutoCloseable`. Closing it
cancels the underlying gRPC stream. Canceling or timing out a Java client call
only stops the client from waiting; it does not abort an in-flight MXAccess COM
call on the worker STA.
## CLI Usage
Run the CLI through Gradle:
```powershell
gradle :mxgateway-cli:run --args="version --json"
gradle :mxgateway-cli:run --args="open-session --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --client-session-name java-cli --json"
gradle :mxgateway-cli:run --args="register --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --client-name java-cli --json"
gradle :mxgateway-cli:run --args="add-item --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item TestObject.TestInt --json"
gradle :mxgateway-cli:run --args="advise --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --json"
gradle :mxgateway-cli:run --args="write --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json"
gradle :mxgateway-cli:run --args="stream-events --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
gradle :mxgateway-cli:run --args="smoke --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --item TestObject.TestInt --json"
```
The CLI accepts `--api-key`, `--api-key-env`, `--plaintext`, `--ca-file`,
`--server-name-override`, `--timeout`, and `--json` on gateway commands. JSON
output redacts API keys.
## Build And Test
@@ -32,7 +86,8 @@ gradle test
```
The build uses the Java 21 Gradle toolchain, compiles generated protobuf/gRPC
code, and runs JUnit 5 tests for the scaffold and CLI entry point.
code, and runs JUnit 5 tests for the client wrapper, shared behavior fixtures,
in-process gRPC behavior, stream cancellation, and CLI parser/output behavior.
## Related Documentation
+2
View File
@@ -3,6 +3,8 @@ plugins {
}
ext {
guavaVersion = '33.5.0-jre'
gsonVersion = '2.13.2'
grpcVersion = '1.76.0'
junitVersion = '5.14.1'
picocliVersion = '4.7.7'
+1
View File
@@ -4,6 +4,7 @@ plugins {
dependencies {
implementation project(':mxgateway-client')
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
implementation "info.picocli:picocli:${picocliVersion}"
}
@@ -1,29 +1,61 @@
package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.MxEventStream;
import com.dohertylan.mxgateway.client.MxGatewayClient;
import com.dohertylan.mxgateway.client.MxGatewayClientOptions;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
import com.dohertylan.mxgateway.client.MxGatewaySecrets;
import com.dohertylan.mxgateway.client.MxGatewaySession;
import com.dohertylan.mxgateway.client.MxValues;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Option;
import picocli.CommandLine.Spec;
@Command(
name = "mxgw-java",
mixinStandardHelpOptions = true,
description = "MXAccess Gateway Java test CLI.",
subcommands = MxGatewayCli.VersionCommand.class)
description = "MXAccess Gateway Java test CLI.")
public final class MxGatewayCli implements Callable<Integer> {
private final MxGatewayCliClientFactory clientFactory;
@Spec
private CommandSpec spec;
public MxGatewayCli() {
this(new GrpcMxGatewayCliClientFactory());
}
MxGatewayCli(MxGatewayCliClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
public static void main(String[] args) {
int exitCode = new CommandLine(new MxGatewayCli()).execute(args);
int exitCode = commandLine(new GrpcMxGatewayCliClientFactory()).execute(args);
System.exit(exitCode);
}
public static int execute(PrintWriter out, PrintWriter err, String... args) {
CommandLine commandLine = new CommandLine(new MxGatewayCli());
return execute(new GrpcMxGatewayCliClientFactory(), out, err, args);
}
static int execute(MxGatewayCliClientFactory clientFactory, PrintWriter out, PrintWriter err, String... args) {
CommandLine commandLine = commandLine(clientFactory);
commandLine.setOut(out);
commandLine.setErr(err);
return commandLine.execute(args);
@@ -35,14 +67,42 @@ public final class MxGatewayCli implements Callable<Integer> {
return 0;
}
@Command(name = "version", description = "Prints the Java client scaffold version.")
private static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory));
commandLine.addSubcommand("version", new VersionCommand());
commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory));
commandLine.addSubcommand("close-session", new CloseSessionCommand(clientFactory));
commandLine.addSubcommand("register", new RegisterCommand(clientFactory));
commandLine.addSubcommand("add-item", new AddItemCommand(clientFactory));
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
return commandLine;
}
@Command(name = "version", description = "Prints the Java client version.")
public static final class VersionCommand implements Callable<Integer> {
@Spec
private CommandSpec spec;
@Option(names = "--json", description = "Write JSON output.")
private boolean json;
@Override
public Integer call() {
spec.commandLine().getOut().printf(
Map<String, Object> values = new LinkedHashMap<>();
values.put("clientVersion", MxGatewayClientVersion.clientVersion());
values.put("gatewayProtocolVersion", MxGatewayClientVersion.gatewayProtocolVersion());
values.put("workerProtocolVersion", MxGatewayClientVersion.workerProtocolVersion());
if (json) {
spec.commandLine().getOut().println(jsonObject(values));
return 0;
}
spec.commandLine()
.getOut()
.printf(
"mxgateway-java %s gatewayProtocolVersion=%d workerProtocolVersion=%d%n",
MxGatewayClientVersion.clientVersion(),
MxGatewayClientVersion.gatewayProtocolVersion(),
@@ -50,4 +110,535 @@ public final class MxGatewayCli implements Callable<Integer> {
return 0;
}
}
abstract static class GatewayCommand implements Callable<Integer> {
final MxGatewayCliClientFactory clientFactory;
@Mixin
CommonOptions common = new CommonOptions();
@Option(names = "--json", description = "Write JSON output.")
boolean json;
GatewayCommand(MxGatewayCliClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
}
@Command(name = "open-session", description = "Opens a gateway session.")
static final class OpenSessionCommand extends GatewayCommand {
@Option(names = "--client-session-name", description = "Client session name.")
String clientSessionName = "";
@Option(names = "--backend", description = "Requested gateway backend.")
String backend = "";
OpenSessionCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
var reply = client.openSession(OpenSessionRequest.newBuilder()
.setClientSessionName(clientSessionName)
.setRequestedBackend(backend)
.build());
writeOutput("open-session", common, json, reply, () -> reply.getSessionId());
}
return 0;
}
}
@Command(name = "close-session", description = "Closes a gateway session.")
static final class CloseSessionCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
CloseSessionCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
var reply = client.closeSession(CloseSessionRequest.newBuilder()
.setSessionId(sessionId)
.build());
writeOutput("close-session", common, json, reply, () -> reply.getFinalState().name());
}
return 0;
}
}
@Command(name = "register", description = "Invokes MXAccess Register.")
static final class RegisterCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--client-name", required = true, description = "MXAccess client name.")
String clientName;
RegisterCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxCommandReply reply = client.session(sessionId).registerRaw(clientName);
writeOutput("register", common, json, reply, () -> reply.getKind().name());
}
return 0;
}
}
@Command(name = "add-item", description = "Invokes MXAccess AddItem.")
static final class AddItemCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item", required = true, description = "Item definition.")
String item;
AddItemCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxCommandReply reply = client.session(sessionId).addItemRaw(serverHandle, item);
writeOutput("add-item", common, json, reply, () -> reply.getKind().name());
}
return 0;
}
}
@Command(name = "advise", description = "Invokes MXAccess Advise.")
static final class AdviseCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handle", required = true, description = "MXAccess item handle.")
int itemHandle;
AdviseCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxCommandReply reply = client.session(sessionId).adviseRaw(serverHandle, itemHandle);
writeOutput("advise", common, json, reply, () -> reply.getKind().name());
}
return 0;
}
}
@Command(name = "write", description = "Invokes MXAccess Write.")
static final class WriteCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
int serverHandle;
@Option(names = "--item-handle", required = true, description = "MXAccess item handle.")
int itemHandle;
@Option(names = "--type", defaultValue = "string", description = "Value type.")
String type;
@Option(names = "--value", required = true, description = "Value text.")
String value;
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
int userId;
WriteCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
MxCommandReply reply =
client.session(sessionId).writeRaw(serverHandle, itemHandle, parseValue(type, value), userId);
writeOutput("write", common, json, reply, () -> reply.getKind().name());
}
return 0;
}
}
@Command(name = "stream-events", description = "Streams gateway events.")
static final class StreamEventsCommand extends GatewayCommand {
@Option(names = "--session-id", required = true, description = "Gateway session id.")
String sessionId;
@Option(names = "--after-worker-sequence", defaultValue = "0", description = "Starting worker sequence.")
long afterWorkerSequence;
@Option(names = "--limit", defaultValue = "0", description = "Maximum events to print.")
int limit;
StreamEventsCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved());
MxEventStream events = client.session(sessionId).streamEventsAfter(afterWorkerSequence)) {
int count = 0;
while (events.hasNext()) {
MxEvent event = events.next();
if (json) {
client.out().println(protoJson(event));
} else {
client.out().printf("%d %s%n", event.getWorkerSequence(), event.getFamily());
}
count++;
if (limit > 0 && count >= limit) {
events.close();
break;
}
}
}
return 0;
}
}
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
static final class SmokeCommand extends GatewayCommand {
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
String clientName;
@Option(names = "--item", required = true, description = "Item definition.")
String item;
SmokeCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
var session = client.openSession(OpenSessionRequest.newBuilder()
.setClientSessionName(clientName)
.build());
MxGatewayCliSession cliSession = client.session(session.getSessionId());
int serverHandle = cliSession.register(clientName);
int itemHandle = cliSession.addItem(serverHandle, item);
cliSession.advise(serverHandle, itemHandle);
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", "smoke");
output.put("options", common.redactedJsonMap());
output.put("sessionId", session.getSessionId());
output.put("serverHandle", serverHandle);
output.put("itemHandle", itemHandle);
client.out().println(jsonObject(output));
} else {
client.out().printf(
"session=%s server=%d item=%d%n", session.getSessionId(), serverHandle, itemHandle);
}
client.closeSession(CloseSessionRequest.newBuilder()
.setSessionId(session.getSessionId())
.build());
}
return 0;
}
}
static final class CommonOptions {
@Spec
CommandSpec spec;
@Option(names = "--endpoint", defaultValue = "localhost:5000", description = "Gateway endpoint.")
String endpoint;
@Option(names = "--api-key", description = "Gateway API key.")
String apiKey = "";
@Option(names = "--api-key-env", defaultValue = "MXGATEWAY_API_KEY", description = "API key environment variable.")
String apiKeyEnv;
@Option(names = "--plaintext", description = "Use plaintext transport.")
boolean plaintext;
@Option(names = "--ca-file", description = "CA certificate file.")
Path caFile;
@Option(names = "--server-name-override", description = "TLS server name override.")
String serverNameOverride = "";
@Option(names = "--timeout", defaultValue = "30s", description = "Per-call timeout.")
String timeout;
private String resolvedApiKey = "";
private Duration resolvedTimeout = Duration.ofSeconds(30);
CommonOptions resolved() {
resolvedApiKey = apiKey == null || apiKey.isBlank() ? System.getenv(apiKeyEnv) : apiKey;
if (resolvedApiKey == null) {
resolvedApiKey = "";
}
resolvedTimeout = parseDuration(timeout);
return this;
}
MxGatewayClientOptions toClientOptions() {
return MxGatewayClientOptions.builder()
.endpoint(endpoint)
.apiKey(resolvedApiKey)
.plaintext(plaintext)
.caCertificatePath(caFile)
.serverNameOverride(serverNameOverride)
.callTimeout(resolvedTimeout)
.build();
}
Map<String, Object> redactedJsonMap() {
Map<String, Object> values = new LinkedHashMap<>();
values.put("endpoint", endpoint);
values.put("apiKey", MxGatewaySecrets.redactApiKey(resolvedApiKey));
values.put("apiKeyEnv", apiKeyEnv);
values.put("plaintext", plaintext);
values.put("caFile", caFile == null ? "" : caFile.toString());
values.put("serverNameOverride", serverNameOverride);
values.put("timeout", timeout);
return values;
}
}
interface MxGatewayCliClientFactory {
MxGatewayCliClient connect(CommonOptions options);
}
interface MxGatewayCliClient extends AutoCloseable {
PrintWriter out();
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(OpenSessionRequest request);
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(CloseSessionRequest request);
MxGatewayCliSession session(String sessionId);
@Override
void close();
}
interface MxGatewayCliSession {
int register(String clientName);
MxCommandReply registerRaw(String clientName);
int addItem(int serverHandle, String itemDefinition);
MxCommandReply addItemRaw(int serverHandle, String itemDefinition);
void advise(int serverHandle, int itemHandle);
MxCommandReply adviseRaw(int serverHandle, int itemHandle);
MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId);
MxEventStream streamEventsAfter(long afterWorkerSequence);
}
static final class GrpcMxGatewayCliClientFactory implements MxGatewayCliClientFactory {
@Override
public MxGatewayCliClient connect(CommonOptions options) {
return new GrpcMxGatewayCliClient(MxGatewayClient.connect(options.toClientOptions()), options.spec.commandLine().getOut());
}
}
static final class GrpcMxGatewayCliClient implements MxGatewayCliClient {
private final MxGatewayClient client;
private final PrintWriter out;
GrpcMxGatewayCliClient(MxGatewayClient client, PrintWriter out) {
this.client = client;
this.out = out;
}
@Override
public PrintWriter out() {
return out;
}
@Override
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(OpenSessionRequest request) {
return client.openSessionRaw(request);
}
@Override
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(CloseSessionRequest request) {
return client.closeSessionRaw(request);
}
@Override
public MxGatewayCliSession session(String sessionId) {
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
}
@Override
public void close() {
client.close();
}
}
record GrpcMxGatewayCliSession(MxGatewaySession session) implements MxGatewayCliSession {
@Override
public int register(String clientName) {
return session.register(clientName);
}
@Override
public MxCommandReply registerRaw(String clientName) {
return session.registerRaw(clientName);
}
@Override
public int addItem(int serverHandle, String itemDefinition) {
return session.addItem(serverHandle, itemDefinition);
}
@Override
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
return session.addItemRaw(serverHandle, itemDefinition);
}
@Override
public void advise(int serverHandle, int itemHandle) {
session.advise(serverHandle, itemHandle);
}
@Override
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
return session.adviseRaw(serverHandle, itemHandle);
}
@Override
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
return session.writeRaw(serverHandle, itemHandle, value, userId);
}
@Override
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
return session.streamEventsAfter(afterWorkerSequence);
}
}
interface TextSupplier {
String get();
}
private static void writeOutput(
String command, CommonOptions common, boolean json, Message reply, TextSupplier textSupplier) {
PrintWriter out = common.spec.commandLine().getOut();
if (json) {
Map<String, Object> output = new LinkedHashMap<>();
output.put("command", command);
output.put("options", common.redactedJsonMap());
output.put("reply", new RawJson(protoJson(reply)));
out.println(jsonObject(output));
return;
}
out.println(textSupplier.get());
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
case "int32" -> MxValues.int32Value(Integer.parseInt(text));
case "int64" -> MxValues.int64Value(Long.parseLong(text));
case "float" -> MxValues.floatValue(Float.parseFloat(text));
case "double" -> MxValues.doubleValue(Double.parseDouble(text));
case "string" -> MxValues.stringValue(text);
default -> throw new IllegalArgumentException("unsupported value type " + type);
};
}
private static Duration parseDuration(String value) {
if (value == null || value.isBlank()) {
return Duration.ofSeconds(30);
}
if (value.startsWith("P")) {
return Duration.parse(value);
}
if (value.endsWith("ms")) {
return Duration.ofMillis(Long.parseLong(value.substring(0, value.length() - 2)));
}
if (value.endsWith("s")) {
return Duration.ofSeconds(Long.parseLong(value.substring(0, value.length() - 1)));
}
if (value.endsWith("m")) {
return Duration.ofMinutes(Long.parseLong(value.substring(0, value.length() - 1)));
}
return Duration.parse(value);
}
private static String protoJson(Message message) {
try {
return JsonFormat.printer().omittingInsignificantWhitespace().print(message);
} catch (Exception error) {
throw new IllegalStateException("failed to write protobuf JSON", error);
}
}
private static String jsonObject(Map<String, Object> values) {
StringBuilder builder = new StringBuilder();
builder.append('{');
boolean first = true;
for (Map.Entry<String, Object> entry : values.entrySet()) {
if (!first) {
builder.append(',');
}
first = false;
builder.append(jsonString(entry.getKey())).append(':').append(jsonValue(entry.getValue()));
}
builder.append('}');
return builder.toString();
}
@SuppressWarnings("unchecked")
private static String jsonValue(Object value) {
if (value == null) {
return "null";
}
if (value instanceof RawJson rawJson) {
return rawJson.value();
}
if (value instanceof String string) {
return jsonString(string);
}
if (value instanceof Number || value instanceof Boolean) {
return value.toString();
}
if (value instanceof Map<?, ?> map) {
return jsonObject((Map<String, Object>) map);
}
return jsonString(value.toString());
}
private static String jsonString(String value) {
return '"'
+ value.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\r", "\\r")
.replace("\n", "\\n")
+ '"';
}
private record RawJson(String value) {
}
}
@@ -1,27 +1,241 @@
package com.dohertylan.mxgateway.cli;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.PrintWriter;
import java.io.StringWriter;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import org.junit.jupiter.api.Test;
final class MxGatewayCliTests {
@Test
void versionCommandPrintsProtocolVersions() {
CliRun run = execute(new FakeClientFactory(), "version");
assertEquals(0, run.exitCode());
assertEquals("", run.errors());
assertTrue(run.output().contains("mxgateway-java 0.1.0"));
assertTrue(run.output().contains("gatewayProtocolVersion=1"));
assertTrue(run.output().contains("workerProtocolVersion=1"));
}
@Test
void versionCommandPrintsJson() {
CliRun run = execute(new FakeClientFactory(), "version", "--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"clientVersion\":\"0.1.0\""));
assertTrue(run.output().contains("\"gatewayProtocolVersion\":1"));
}
@Test
void openSessionJsonRedactsApiKey() {
CliRun run = execute(
new FakeClientFactory(),
"open-session",
"--endpoint",
"localhost:5000",
"--api-key",
"mxgw_visible_secret",
"--plaintext",
"--client-session-name",
"java-cli",
"--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"command\":\"open-session\""));
assertTrue(run.output().contains("\"sessionId\":\"session-cli\""));
assertTrue(run.output().contains("mxgw***********cret"));
assertFalse(run.output().contains("visible_secret"));
}
@Test
void writeBuildsTypedValueFromParserOptions() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"write",
"--session-id",
"session-cli",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"int32",
"--value",
"123",
"--json");
assertEquals(0, run.exitCode());
assertEquals(123, factory.client.session.lastWriteValue.getInt32Value());
assertTrue(run.output().contains("\"kind\":\"MX_COMMAND_KIND_WRITE\""));
}
@Test
void smokeCommandRunsOpenRegisterAddAdviseAndClose() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "smoke", "--item", "TestObject.TestInt", "--json");
assertEquals(0, run.exitCode());
assertTrue(factory.client.session.registerCalled);
assertTrue(factory.client.session.addItemCalled);
assertTrue(factory.client.session.adviseCalled);
assertTrue(factory.client.closeCalled);
assertTrue(run.output().contains("\"serverHandle\":42"));
assertTrue(run.output().contains("\"itemHandle\":7"));
}
private static CliRun execute(MxGatewayCli.MxGatewayCliClientFactory factory, String... args) {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
int exitCode = MxGatewayCli.execute(
factory,
new PrintWriter(output, true),
new PrintWriter(errors, true),
"version");
args);
return new CliRun(exitCode, output.toString(), errors.toString());
}
assertEquals(0, exitCode);
assertEquals("", errors.toString());
assertTrue(output.toString().contains("mxgateway-java 0.1.0"));
assertTrue(output.toString().contains("gatewayProtocolVersion=1"));
assertTrue(output.toString().contains("workerProtocolVersion=1"));
private record CliRun(int exitCode, String output, String errors) {
}
private static final class FakeClientFactory implements MxGatewayCli.MxGatewayCliClientFactory {
private FakeClient client;
@Override
public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) {
client = new FakeClient(options.spec.commandLine().getOut());
return client;
}
}
private static final class FakeClient implements MxGatewayCli.MxGatewayCliClient {
private final PrintWriter out;
private final FakeSession session = new FakeSession();
private boolean closeCalled;
private FakeClient(PrintWriter out) {
this.out = out;
}
@Override
public PrintWriter out() {
return out;
}
@Override
public OpenSessionReply openSession(OpenSessionRequest request) {
return OpenSessionReply.newBuilder()
.setSessionId("session-cli")
.setProtocolStatus(ok())
.build();
}
@Override
public CloseSessionReply closeSession(CloseSessionRequest request) {
closeCalled = true;
return CloseSessionReply.newBuilder()
.setSessionId(request.getSessionId())
.setFinalState(SessionState.SESSION_STATE_CLOSED)
.setProtocolStatus(ok())
.build();
}
@Override
public MxGatewayCli.MxGatewayCliSession session(String sessionId) {
return session;
}
@Override
public void close() {
}
}
private static final class FakeSession implements MxGatewayCli.MxGatewayCliSession {
private boolean registerCalled;
private boolean addItemCalled;
private boolean adviseCalled;
private MxValue lastWriteValue;
@Override
public int register(String clientName) {
registerCalled = true;
return 42;
}
@Override
public MxCommandReply registerRaw(String clientName) {
registerCalled = true;
return MxCommandReply.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_REGISTER)
.setProtocolStatus(ok())
.setRegister(RegisterReply.newBuilder().setServerHandle(42))
.build();
}
@Override
public int addItem(int serverHandle, String itemDefinition) {
addItemCalled = true;
return 7;
}
@Override
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
addItemCalled = true;
return MxCommandReply.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM)
.setProtocolStatus(ok())
.setAddItem(AddItemReply.newBuilder().setItemHandle(7))
.build();
}
@Override
public void advise(int serverHandle, int itemHandle) {
adviseCalled = true;
}
@Override
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
adviseCalled = true;
return MxCommandReply.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADVISE)
.setProtocolStatus(ok())
.build();
}
@Override
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
lastWriteValue = value;
return MxCommandReply.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
.setProtocolStatus(ok())
.build();
}
@Override
public com.dohertylan.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) {
throw new UnsupportedOperationException("stream-events is covered by client tests");
}
}
private static ProtocolStatus ok() {
return ProtocolStatus.newBuilder()
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
.build();
}
}
@@ -4,13 +4,19 @@ plugins {
}
dependencies {
api "com.google.protobuf:protobuf-java-util:${protobufVersion}"
api "com.google.protobuf:protobuf-java:${protobufVersion}"
api "io.grpc:grpc-protobuf:${grpcVersion}"
api "io.grpc:grpc-stub:${grpcVersion}"
implementation "com.google.guava:guava:${guavaVersion}"
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
compileOnly 'javax.annotation:javax.annotation-api:1.3.2'
testImplementation "com.google.code.gson:gson:${gsonVersion}"
testImplementation "io.grpc:grpc-inprocess:${grpcVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
}
sourceSets {
@@ -0,0 +1,14 @@
package com.dohertylan.mxgateway.client;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
public final class MxAccessException extends MxGatewayCommandException {
public MxAccessException(String operation, ProtocolStatus protocolStatus, MxCommandReply reply) {
super(operation, protocolStatus, reply);
}
public MxAccessException(String operation, MxCommandReply reply) {
super(operation, reply == null ? null : reply.getProtocolStatus(), reply);
}
}
@@ -0,0 +1,117 @@
package com.dohertylan.mxgateway.client;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
private static final Object END = new Object();
private final BlockingQueue<Object> queue;
private volatile ClientCallStreamObserver<StreamEventsRequest> requestStream;
private volatile boolean closed;
private Object next;
MxEventStream(int capacity) {
queue = new ArrayBlockingQueue<>(capacity);
}
ClientResponseObserver<StreamEventsRequest, MxEvent> observer() {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> requestStream) {
MxEventStream.this.requestStream = requestStream;
}
@Override
public void onNext(MxEvent value) {
offer(value);
}
@Override
public void onError(Throwable error) {
if (Status.fromThrowable(error).getCode() == Status.Code.CANCELLED && closed) {
offer(END);
return;
}
offer(error);
}
@Override
public void onCompleted() {
offer(END);
}
};
}
@Override
public boolean hasNext() {
if (next == END) {
return false;
}
if (next == null) {
next = take();
}
if (next instanceof RuntimeException runtimeException) {
next = END;
throw runtimeException;
}
if (next instanceof Throwable throwable) {
next = END;
throw new MxGatewayException("gateway stream events failed: " + throwable.getMessage(), throwable);
}
return next != END;
}
@Override
public MxEvent next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Object value = next;
next = null;
return (MxEvent) value;
}
@Override
public void close() {
closed = true;
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream;
if (stream != null) {
stream.cancel("client cancelled event stream", null);
}
offer(END);
}
private Object take() {
while (true) {
try {
return queue.take();
} catch (InterruptedException error) {
Thread.currentThread().interrupt();
return new StatusRuntimeException(Status.CANCELLED.withDescription("interrupted while reading events"));
}
}
}
private void offer(Object value) {
Objects.requireNonNull(value, "value");
if (value == END) {
queue.offer(value);
return;
}
try {
queue.put(value);
} catch (InterruptedException error) {
Thread.currentThread().interrupt();
}
}
}
@@ -0,0 +1,37 @@
package com.dohertylan.mxgateway.client;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
public final class MxGatewayAuthInterceptor implements ClientInterceptor {
static final Metadata.Key<String> AUTHORIZATION_HEADER =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private final String apiKey;
public MxGatewayAuthInterceptor(String apiKey) {
this.apiKey = apiKey == null ? "" : apiKey;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
if (apiKey.isBlank()) {
return call;
}
return new ForwardingClientCall.SimpleForwardingClientCall<>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(AUTHORIZATION_HEADER, "Bearer " + apiKey);
super.start(responseListener, headers);
}
};
}
}
@@ -0,0 +1,7 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewayAuthenticationException extends MxGatewayException {
public MxGatewayAuthenticationException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,7 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewayAuthorizationException extends MxGatewayException {
public MxGatewayAuthorizationException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,228 @@
package com.dohertylan.mxgateway.client;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Duration;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
public final class MxGatewayClient implements AutoCloseable {
private final ManagedChannel ownedChannel;
private final MxGatewayClientOptions options;
private final MxAccessGatewayGrpc.MxAccessGatewayBlockingStub blockingStub;
private final MxAccessGatewayGrpc.MxAccessGatewayFutureStub futureStub;
private final MxAccessGatewayGrpc.MxAccessGatewayStub asyncStub;
private MxGatewayClient(ManagedChannel channel, MxGatewayClientOptions options) {
this.ownedChannel = channel;
this.options = options;
Channel intercepted = ClientInterceptors.intercept(channel, new MxGatewayAuthInterceptor(options.apiKey()));
blockingStub = MxAccessGatewayGrpc.newBlockingStub(intercepted);
futureStub = MxAccessGatewayGrpc.newFutureStub(intercepted);
asyncStub = MxAccessGatewayGrpc.newStub(intercepted);
}
public MxGatewayClient(Channel channel, MxGatewayClientOptions options) {
this.ownedChannel = null;
this.options = Objects.requireNonNull(options, "options");
Channel intercepted = ClientInterceptors.intercept(channel, new MxGatewayAuthInterceptor(options.apiKey()));
blockingStub = MxAccessGatewayGrpc.newBlockingStub(intercepted);
futureStub = MxAccessGatewayGrpc.newFutureStub(intercepted);
asyncStub = MxAccessGatewayGrpc.newStub(intercepted);
}
public static MxGatewayClient connect(MxGatewayClientOptions options) {
return new MxGatewayClient(createChannel(options), options);
}
public MxAccessGatewayGrpc.MxAccessGatewayBlockingStub rawBlockingStub() {
return withDeadline(blockingStub);
}
public MxAccessGatewayGrpc.MxAccessGatewayFutureStub rawFutureStub() {
return withDeadline(futureStub);
}
public MxAccessGatewayGrpc.MxAccessGatewayStub rawAsyncStub() {
return withDeadline(asyncStub);
}
public MxGatewaySession openSession(OpenSessionRequest request) {
OpenSessionReply reply = openSessionRaw(request);
return new MxGatewaySession(this, reply);
}
public MxGatewaySession openSession(String clientSessionName) {
return openSession(OpenSessionRequest.newBuilder()
.setClientSessionName(clientSessionName)
.setCommandTimeout(Duration.newBuilder()
.setSeconds(options.callTimeout().toSeconds())
.setNanos(options.callTimeout().toNanosPart())
.build())
.build());
}
public OpenSessionReply openSessionRaw(OpenSessionRequest request) {
try {
OpenSessionReply reply = rawBlockingStub().openSession(request);
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
return reply;
} catch (RuntimeException error) {
if (error instanceof MxGatewayException) {
throw error;
}
throw MxGatewayErrors.fromGrpc("open session", error);
}
}
public CompletableFuture<OpenSessionReply> openSessionAsync(OpenSessionRequest request) {
CompletableFuture<OpenSessionReply> future = toCompletable(rawFutureStub().openSession(request));
return future.thenApply(reply -> {
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
return reply;
});
}
public MxCommandReply invoke(MxCommandRequest request) {
try {
MxCommandReply reply = rawBlockingStub().invoke(request);
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
MxGatewayErrors.ensureMxAccessSuccess("invoke", reply);
return reply;
} catch (RuntimeException error) {
if (error instanceof MxGatewayException) {
throw error;
}
throw MxGatewayErrors.fromGrpc("invoke", error);
}
}
public CompletableFuture<MxCommandReply> invokeAsync(MxCommandRequest request) {
CompletableFuture<MxCommandReply> future = toCompletable(rawFutureStub().invoke(request));
return future.thenApply(reply -> {
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
MxGatewayErrors.ensureMxAccessSuccess("invoke", reply);
return reply;
});
}
public CloseSessionReply closeSessionRaw(CloseSessionRequest request) {
try {
CloseSessionReply reply = rawBlockingStub().closeSession(request);
MxGatewayErrors.ensureProtocolSuccess("close session", reply.getProtocolStatus(), null);
return reply;
} catch (RuntimeException error) {
if (error instanceof MxGatewayException) {
throw error;
}
throw MxGatewayErrors.fromGrpc("close session", error);
}
}
public MxEventStream streamEvents(StreamEventsRequest request) {
MxEventStream stream = new MxEventStream(16);
rawAsyncStub().streamEvents(request, stream.observer());
return stream;
}
public MxGatewayEventSubscription streamEventsAsync(
StreamEventsRequest request, StreamObserver<MxEvent> observer) {
MxGatewayEventSubscription subscription = new MxGatewayEventSubscription();
rawAsyncStub().streamEvents(request, subscription.wrap(observer));
return subscription;
}
@Override
public void close() {
if (ownedChannel != null) {
ownedChannel.shutdown();
}
}
public void closeAndAwaitTermination() throws InterruptedException {
if (ownedChannel != null) {
ownedChannel.shutdown();
ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
}
private static ManagedChannel createChannel(MxGatewayClientOptions options) {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint())
.maxInboundMessageSize(16 * 1024 * 1024);
if (!options.connectTimeout().isNegative()) {
builder.withOption(
io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(options.connectTimeout().toMillis()));
}
if (options.plaintext()) {
builder.usePlaintext();
} else if (options.caCertificatePath() != null) {
try {
builder.sslContext(GrpcSslContexts.forClient()
.trustManager(options.caCertificatePath().toFile())
.build());
} catch (SSLException error) {
throw new MxGatewayException("failed to configure gateway TLS", error);
}
} else {
builder.useTransportSecurity();
}
if (!options.serverNameOverride().isBlank()) {
builder.overrideAuthority(options.serverNameOverride());
}
return builder.build();
}
private <T extends io.grpc.stub.AbstractStub<T>> T withDeadline(T stub) {
if (options.callTimeout().isNegative()) {
return stub;
}
return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
private static <T> CompletableFuture<T> toCompletable(com.google.common.util.concurrent.ListenableFuture<T> source) {
CompletableFuture<T> target = new CompletableFuture<>();
Futures.addCallback(
source,
new FutureCallback<>() {
@Override
public void onSuccess(T result) {
target.complete(result);
}
@Override
public void onFailure(Throwable error) {
if (error instanceof RuntimeException runtimeException) {
target.completeExceptionally(MxGatewayErrors.fromGrpc("async call", runtimeException));
return;
}
target.completeExceptionally(error);
}
},
MoreExecutors.directExecutor());
return target;
}
static ProtocolStatusCode okStatusCode() {
return ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK;
}
}
@@ -0,0 +1,146 @@
package com.dohertylan.mxgateway.client;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Objects;
public final class MxGatewayClientOptions {
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofSeconds(30);
private final String endpoint;
private final String apiKey;
private final boolean plaintext;
private final Path caCertificatePath;
private final String serverNameOverride;
private final Duration connectTimeout;
private final Duration callTimeout;
private MxGatewayClientOptions(Builder builder) {
endpoint = requireText(builder.endpoint, "endpoint");
apiKey = builder.apiKey == null ? "" : builder.apiKey;
plaintext = builder.plaintext;
caCertificatePath = builder.caCertificatePath;
serverNameOverride = builder.serverNameOverride == null ? "" : builder.serverNameOverride;
connectTimeout = builder.connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : builder.connectTimeout;
callTimeout = builder.callTimeout == null ? DEFAULT_CALL_TIMEOUT : builder.callTimeout;
}
public static Builder builder() {
return new Builder();
}
public String endpoint() {
return endpoint;
}
public String apiKey() {
return apiKey;
}
public String redactedApiKey() {
return MxGatewaySecrets.redactApiKey(apiKey);
}
public boolean plaintext() {
return plaintext;
}
public Path caCertificatePath() {
return caCertificatePath;
}
public String serverNameOverride() {
return serverNameOverride;
}
public Duration connectTimeout() {
return connectTimeout;
}
public Duration callTimeout() {
return callTimeout;
}
@Override
public String toString() {
return "MxGatewayClientOptions{"
+ "endpoint='"
+ endpoint
+ '\''
+ ", apiKey='"
+ redactedApiKey()
+ '\''
+ ", plaintext="
+ plaintext
+ ", caCertificatePath="
+ caCertificatePath
+ ", serverNameOverride='"
+ serverNameOverride
+ '\''
+ ", connectTimeout="
+ connectTimeout
+ ", callTimeout="
+ callTimeout
+ '}';
}
private static String requireText(String value, String name) {
if (value == null || value.isBlank()) {
throw new IllegalArgumentException(name + " is required");
}
return value;
}
public static final class Builder {
private String endpoint;
private String apiKey;
private boolean plaintext;
private Path caCertificatePath;
private String serverNameOverride;
private Duration connectTimeout;
private Duration callTimeout;
private Builder() {
}
public Builder endpoint(String value) {
endpoint = value;
return this;
}
public Builder apiKey(String value) {
apiKey = value;
return this;
}
public Builder plaintext(boolean value) {
plaintext = value;
return this;
}
public Builder caCertificatePath(Path value) {
caCertificatePath = value;
return this;
}
public Builder serverNameOverride(String value) {
serverNameOverride = value;
return this;
}
public Builder connectTimeout(Duration value) {
connectTimeout = Objects.requireNonNull(value, "connectTimeout");
return this;
}
public Builder callTimeout(Duration value) {
callTimeout = Objects.requireNonNull(value, "callTimeout");
return this;
}
public MxGatewayClientOptions build() {
return new MxGatewayClientOptions(this);
}
}
}
@@ -0,0 +1,23 @@
package com.dohertylan.mxgateway.client;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
public class MxGatewayCommandException extends MxGatewayException {
private final ProtocolStatus protocolStatus;
private final MxCommandReply reply;
public MxGatewayCommandException(String operation, ProtocolStatus protocolStatus, MxCommandReply reply) {
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
this.protocolStatus = protocolStatus;
this.reply = reply;
}
public ProtocolStatus protocolStatus() {
return protocolStatus;
}
public MxCommandReply reply() {
return reply;
}
}
@@ -0,0 +1,72 @@
package com.dohertylan.mxgateway.client;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
final class MxGatewayErrors {
private MxGatewayErrors() {
}
static RuntimeException fromGrpc(String operation, RuntimeException error) {
if (error instanceof StatusRuntimeException statusError) {
Status status = statusError.getStatus();
String message = MxGatewaySecrets.redactCredentials(status.getDescription());
return switch (status.getCode()) {
case UNAUTHENTICATED -> new MxGatewayAuthenticationException(
"authentication failed: " + message, statusError);
case PERMISSION_DENIED -> new MxGatewayAuthorizationException(
"authorization failed: " + message, statusError);
case DEADLINE_EXCEEDED -> new MxGatewayException("gateway call timed out: " + message, statusError);
case CANCELLED -> new MxGatewayException("gateway call cancelled: " + message, statusError);
default -> new MxGatewayException("gateway " + operation + " failed: " + message, statusError);
};
}
return new MxGatewayException("gateway " + operation + " failed: " + error.getMessage(), error);
}
static void ensureProtocolSuccess(String operation, ProtocolStatus status, MxCommandReply reply) {
if (status == null || status.getCode() == ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK) {
return;
}
throw switch (status.getCode()) {
case PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND, PROTOCOL_STATUS_CODE_SESSION_NOT_READY ->
new MxGatewaySessionException(operation, status);
case PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE, PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION ->
new MxGatewayWorkerException(operation, status);
case PROTOCOL_STATUS_CODE_MXACCESS_FAILURE -> new MxAccessException(operation, status, reply);
default -> new MxGatewayCommandException(operation, status, reply);
};
}
static void ensureMxAccessSuccess(String operation, MxCommandReply reply) {
if (reply == null) {
return;
}
if (reply.hasHresult() && reply.getHresult() != 0) {
throw new MxAccessException(operation, reply);
}
for (var status : reply.getStatusesList()) {
if (!MxStatuses.succeeded(status)) {
throw new MxAccessException(operation, reply);
}
}
}
static String protocolStatusMessage(String operation, ProtocolStatus status) {
if (status == null) {
return "mxgateway " + operation + " failed with missing protocol status";
}
if (status.getMessage().isBlank()) {
return "mxgateway " + operation + " failed with protocol status " + status.getCode();
}
return "mxgateway " + operation + " failed with protocol status "
+ status.getCode()
+ ": "
+ status.getMessage();
}
}
@@ -0,0 +1,48 @@
package com.dohertylan.mxgateway.client;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
public final class MxGatewayEventSubscription implements AutoCloseable {
private final AtomicReference<ClientCallStreamObserver<StreamEventsRequest>> requestStream = new AtomicReference<>();
ClientResponseObserver<StreamEventsRequest, MxEvent> wrap(StreamObserver<MxEvent> observer) {
return new ClientResponseObserver<>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> stream) {
requestStream.set(stream);
}
@Override
public void onNext(MxEvent value) {
observer.onNext(value);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
public void cancel() {
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream.get();
if (stream != null) {
stream.cancel("client cancelled event stream", null);
}
}
@Override
public void close() {
cancel();
}
}
@@ -0,0 +1,11 @@
package com.dohertylan.mxgateway.client;
public class MxGatewayException extends RuntimeException {
public MxGatewayException(String message) {
super(message);
}
public MxGatewayException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,33 @@
package com.dohertylan.mxgateway.client;
public final class MxGatewaySecrets {
private MxGatewaySecrets() {
}
public static String redactApiKey(String apiKey) {
if (apiKey == null || apiKey.isEmpty()) {
return "";
}
if (apiKey.length() <= 8) {
return "<redacted>";
}
return apiKey.substring(0, 4)
+ "*".repeat(apiKey.length() - 8)
+ apiKey.substring(apiKey.length() - 4);
}
public static String redactCredentials(String value) {
if (value == null || value.isBlank()) {
return value == null ? "" : value;
}
String[] parts = value.split("\\s+");
for (int index = 0; index < parts.length; index++) {
if (parts[index].startsWith("mxgw_") || parts[index].equalsIgnoreCase("bearer")) {
parts[index] = "<redacted>";
}
}
return String.join(" ", parts);
}
}
@@ -0,0 +1,184 @@
package com.dohertylan.mxgateway.client;
import java.security.SecureRandom;
import java.util.HexFormat;
import java.util.Objects;
import mxaccess_gateway.v1.MxaccessGateway.AddItem2Command;
import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand;
import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommand;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand;
import mxaccess_gateway.v1.MxaccessGateway.Write2Command;
import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
public final class MxGatewaySession implements AutoCloseable {
private static final SecureRandom RANDOM = new SecureRandom();
private final MxGatewayClient client;
private final OpenSessionReply openReply;
private CloseSessionReply closeReply;
MxGatewaySession(MxGatewayClient client, OpenSessionReply openReply) {
this.client = Objects.requireNonNull(client, "client");
this.openReply = Objects.requireNonNull(openReply, "openReply");
}
public static MxGatewaySession forSessionId(MxGatewayClient client, String sessionId) {
return new MxGatewaySession(
client, OpenSessionReply.newBuilder().setSessionId(sessionId).build());
}
public String sessionId() {
return openReply.getSessionId();
}
public OpenSessionReply openReply() {
return openReply;
}
public synchronized CloseSessionReply closeRaw() {
if (closeReply == null) {
closeReply = client.closeSessionRaw(CloseSessionRequest.newBuilder()
.setSessionId(sessionId())
.setClientCorrelationId(newCorrelationId())
.build());
}
return closeReply;
}
@Override
public void close() {
closeRaw();
}
public int register(String clientName) {
MxCommandReply reply = registerRaw(clientName);
if (reply.hasRegister()) {
return reply.getRegister().getServerHandle();
}
return reply.getReturnValue().getInt32Value();
}
public MxCommandReply registerRaw(String clientName) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_REGISTER)
.setRegister(RegisterCommand.newBuilder().setClientName(clientName))
.build());
}
public void unregister(int serverHandle) {
invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_UNREGISTER)
.setUnregister(UnregisterCommand.newBuilder().setServerHandle(serverHandle))
.build());
}
public int addItem(int serverHandle, String itemDefinition) {
MxCommandReply reply = addItemRaw(serverHandle, itemDefinition);
if (reply.hasAddItem()) {
return reply.getAddItem().getItemHandle();
}
return reply.getReturnValue().getInt32Value();
}
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM)
.setAddItem(AddItemCommand.newBuilder()
.setServerHandle(serverHandle)
.setItemDefinition(itemDefinition))
.build());
}
public int addItem2(int serverHandle, String itemDefinition, String itemContext) {
MxCommandReply reply = addItem2Raw(serverHandle, itemDefinition, itemContext);
if (reply.hasAddItem2()) {
return reply.getAddItem2().getItemHandle();
}
return reply.getReturnValue().getInt32Value();
}
public MxCommandReply addItem2Raw(int serverHandle, String itemDefinition, String itemContext) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM2)
.setAddItem2(AddItem2Command.newBuilder()
.setServerHandle(serverHandle)
.setItemDefinition(itemDefinition)
.setItemContext(itemContext))
.build());
}
public void advise(int serverHandle, int itemHandle) {
adviseRaw(serverHandle, itemHandle);
}
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_ADVISE)
.setAdvise(AdviseCommand.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(itemHandle))
.build());
}
public void write(int serverHandle, int itemHandle, MxValue value, int userId) {
writeRaw(serverHandle, itemHandle, value, userId);
}
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
return invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
.setWrite(WriteCommand.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(itemHandle)
.setValue(value)
.setUserId(userId))
.build());
}
public void write2(int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId) {
invokeCommand(MxCommand.newBuilder()
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE2)
.setWrite2(Write2Command.newBuilder()
.setServerHandle(serverHandle)
.setItemHandle(itemHandle)
.setValue(value)
.setTimestampValue(timestampValue)
.setUserId(userId))
.build());
}
public MxEventStream streamEvents() {
return streamEventsAfter(0);
}
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
return client.streamEvents(StreamEventsRequest.newBuilder()
.setSessionId(sessionId())
.setAfterWorkerSequence(afterWorkerSequence)
.build());
}
public MxCommandReply invokeCommand(MxCommand command) {
return client.invoke(MxCommandRequest.newBuilder()
.setSessionId(sessionId())
.setClientCorrelationId(newCorrelationId())
.setCommand(command)
.build());
}
private static String newCorrelationId() {
byte[] bytes = new byte[16];
RANDOM.nextBytes(bytes);
return HexFormat.of().formatHex(bytes);
}
}
@@ -0,0 +1,16 @@
package com.dohertylan.mxgateway.client;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
public final class MxGatewaySessionException extends MxGatewayException {
private final ProtocolStatus protocolStatus;
public MxGatewaySessionException(String operation, ProtocolStatus protocolStatus) {
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
this.protocolStatus = protocolStatus;
}
public ProtocolStatus protocolStatus() {
return protocolStatus;
}
}
@@ -0,0 +1,16 @@
package com.dohertylan.mxgateway.client;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
public final class MxGatewayWorkerException extends MxGatewayException {
private final ProtocolStatus protocolStatus;
public MxGatewayWorkerException(String operation, ProtocolStatus protocolStatus) {
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
this.protocolStatus = protocolStatus;
}
public ProtocolStatus protocolStatus() {
return protocolStatus;
}
}
@@ -0,0 +1,48 @@
package com.dohertylan.mxgateway.client;
import mxaccess_gateway.v1.MxaccessGateway.MxStatusCategory;
import mxaccess_gateway.v1.MxaccessGateway.MxStatusProxy;
import mxaccess_gateway.v1.MxaccessGateway.MxStatusSource;
public final class MxStatuses {
private MxStatuses() {
}
public static boolean succeeded(MxStatusProxy status) {
return status == null || status.getSuccess() != 0;
}
public static MxStatusView view(MxStatusProxy status) {
return new MxStatusView(status);
}
public record MxStatusView(MxStatusProxy raw) {
public int success() {
return raw.getSuccess();
}
public MxStatusCategory category() {
return raw.getCategory();
}
public MxStatusSource detectedBy() {
return raw.getDetectedBy();
}
public int detail() {
return raw.getDetail();
}
public int rawCategory() {
return raw.getRawCategory();
}
public int rawDetectedBy() {
return raw.getRawDetectedBy();
}
public String diagnosticText() {
return raw.getDiagnosticText();
}
}
}
@@ -0,0 +1,170 @@
package com.dohertylan.mxgateway.client;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.BoolArray;
import mxaccess_gateway.v1.MxaccessGateway.DoubleArray;
import mxaccess_gateway.v1.MxaccessGateway.FloatArray;
import mxaccess_gateway.v1.MxaccessGateway.Int32Array;
import mxaccess_gateway.v1.MxaccessGateway.Int64Array;
import mxaccess_gateway.v1.MxaccessGateway.MxArray;
import mxaccess_gateway.v1.MxaccessGateway.MxDataType;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.RawArray;
import mxaccess_gateway.v1.MxaccessGateway.StringArray;
import mxaccess_gateway.v1.MxaccessGateway.TimestampArray;
public final class MxValues {
private MxValues() {
}
public static MxValue boolValue(boolean value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_BOOLEAN)
.setVariantType("VT_BOOL")
.setBoolValue(value)
.build();
}
public static MxValue int32Value(int value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_INTEGER)
.setVariantType("VT_I4")
.setInt32Value(value)
.build();
}
public static MxValue int64Value(long value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_INTEGER)
.setVariantType("VT_I8")
.setInt64Value(value)
.build();
}
public static MxValue floatValue(float value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_FLOAT)
.setVariantType("VT_R4")
.setFloatValue(value)
.build();
}
public static MxValue doubleValue(double value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_DOUBLE)
.setVariantType("VT_R8")
.setDoubleValue(value)
.build();
}
public static MxValue stringValue(String value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_STRING)
.setVariantType("VT_BSTR")
.setStringValue(value)
.build();
}
public static MxValue timestampValue(Instant value) {
return MxValue.newBuilder()
.setDataType(MxDataType.MX_DATA_TYPE_TIME)
.setVariantType("VT_DATE")
.setTimestampValue(Timestamp.newBuilder()
.setSeconds(value.getEpochSecond())
.setNanos(value.getNano())
.build())
.build();
}
public static Object nativeValue(MxValue value) {
if (value == null || value.getIsNull()) {
return null;
}
return switch (value.getKindCase()) {
case BOOL_VALUE -> value.getBoolValue();
case INT32_VALUE -> value.getInt32Value();
case INT64_VALUE -> value.getInt64Value();
case FLOAT_VALUE -> value.getFloatValue();
case DOUBLE_VALUE -> value.getDoubleValue();
case STRING_VALUE -> value.getStringValue();
case TIMESTAMP_VALUE -> instant(value.getTimestampValue());
case ARRAY_VALUE -> nativeArray(value.getArrayValue());
case RAW_VALUE -> value.getRawValue().toByteArray();
case KIND_NOT_SET -> null;
};
}
public static Object nativeArray(MxArray array) {
if (array == null) {
return null;
}
return switch (array.getValuesCase()) {
case BOOL_VALUES -> List.copyOf(array.getBoolValues().getValuesList());
case INT32_VALUES -> List.copyOf(array.getInt32Values().getValuesList());
case INT64_VALUES -> List.copyOf(array.getInt64Values().getValuesList());
case FLOAT_VALUES -> List.copyOf(array.getFloatValues().getValuesList());
case DOUBLE_VALUES -> List.copyOf(array.getDoubleValues().getValuesList());
case STRING_VALUES -> List.copyOf(array.getStringValues().getValuesList());
case TIMESTAMP_VALUES -> timestampValues(array.getTimestampValues());
case RAW_VALUES -> rawValues(array.getRawValues());
case VALUES_NOT_SET -> List.of();
};
}
public static MxArray stringArray(List<String> values) {
return MxArray.newBuilder()
.setElementDataType(MxDataType.MX_DATA_TYPE_STRING)
.setVariantType("VT_ARRAY|VT_BSTR")
.addDimensions(values.size())
.setStringValues(StringArray.newBuilder().addAllValues(values))
.build();
}
public static MxArray int32Array(List<Integer> values) {
return MxArray.newBuilder()
.setElementDataType(MxDataType.MX_DATA_TYPE_INTEGER)
.setVariantType("VT_ARRAY|VT_I4")
.addDimensions(values.size())
.setInt32Values(Int32Array.newBuilder().addAllValues(values))
.build();
}
public static String kindName(MxValue value) {
return value == null ? "KIND_NOT_SET" : value.getKindCase().name();
}
private static Instant instant(Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
private static List<Instant> timestampValues(TimestampArray array) {
List<Instant> values = new ArrayList<>();
for (Timestamp timestamp : array.getValuesList()) {
values.add(instant(timestamp));
}
return values;
}
private static List<byte[]> rawValues(RawArray array) {
List<byte[]> values = new ArrayList<>();
for (ByteString rawValue : array.getValuesList()) {
values.add(rawValue.toByteArray());
}
return values;
}
@SuppressWarnings("unused")
private static void generatedTypeReferences(
BoolArray boolArray,
Int64Array int64Array,
FloatArray floatArray,
DoubleArray doubleArray) {
// Keeps generated repeated-value imports visible for javadocs and IDE navigation.
}
}
@@ -0,0 +1,243 @@
package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
import org.junit.jupiter.api.Test;
final class MxGatewayClientSessionTests {
@Test
void unaryCallsCarryAuthMetadataAndDeadline() throws Exception {
AtomicReference<String> authorization = new AtomicReference<>();
AtomicReference<MxCommandRequest> commandRequest = new AtomicReference<>();
AtomicReference<Boolean> deadlineSeen = new AtomicReference<>(false);
TestGatewayService service = new TestGatewayService() {
@Override
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
deadlineSeen.set(Context.current().getDeadline() != null);
responseObserver.onNext(OpenSessionReply.newBuilder()
.setSessionId("session-java")
.setProtocolStatus(ok())
.build());
responseObserver.onCompleted();
}
@Override
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
commandRequest.set(request);
responseObserver.onNext(MxCommandReply.newBuilder()
.setSessionId(request.getSessionId())
.setKind(request.getCommand().getKind())
.setProtocolStatus(ok())
.setRegister(RegisterReply.newBuilder().setServerHandle(42))
.build());
responseObserver.onCompleted();
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, authorization);
MxGatewayClient client = gateway.client("mxgw_visible_secret", Duration.ofSeconds(5))) {
MxGatewaySession session = client.openSession("junit-session");
int serverHandle = session.register("java-test-client");
assertEquals(42, serverHandle);
assertEquals("Bearer mxgw_visible_secret", authorization.get());
assertEquals("session-java", commandRequest.get().getSessionId());
assertEquals(MxCommandKind.MX_COMMAND_KIND_REGISTER, commandRequest.get().getCommand().getKind());
assertTrue(deadlineSeen.get());
}
}
@Test
void methodHelpersReturnTypedHandlesAndRawReplies() throws Exception {
TestGatewayService service = new TestGatewayService() {
@Override
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
MxCommandReply.Builder reply = MxCommandReply.newBuilder()
.setSessionId(request.getSessionId())
.setKind(request.getCommand().getKind())
.setProtocolStatus(ok());
if (request.getCommand().getKind() == MxCommandKind.MX_COMMAND_KIND_ADD_ITEM) {
reply.setAddItem(AddItemReply.newBuilder().setItemHandle(7));
}
responseObserver.onNext(reply.build());
responseObserver.onCompleted();
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
MxGatewaySession session = MxGatewaySession.forSessionId(client, "existing-session");
int itemHandle = session.addItem(12, "TestObject.TestInt");
MxCommandReply raw = session.adviseRaw(12, itemHandle);
assertEquals(7, itemHandle);
assertEquals(MxCommandKind.MX_COMMAND_KIND_ADVISE, raw.getKind());
}
}
@Test
void streamCancellationCancelsServerCall() throws Exception {
CountDownLatch cancelled = new CountDownLatch(1);
TestGatewayService service = new TestGatewayService() {
@Override
public void streamEvents(StreamEventsRequest request, StreamObserver<MxEvent> responseObserver) {
ServerCallStreamObserver<MxEvent> serverObserver =
(ServerCallStreamObserver<MxEvent>) responseObserver;
serverObserver.setOnCancelHandler(cancelled::countDown);
responseObserver.onNext(MxEvent.newBuilder()
.setSessionId(request.getSessionId())
.setWorkerSequence(1)
.build());
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
MxEventStream events = MxGatewaySession.forSessionId(client, "stream-session").streamEvents();
assertTrue(events.hasNext());
assertEquals(1, events.next().getWorkerSequence());
events.close();
assertTrue(cancelled.await(5, TimeUnit.SECONDS));
}
}
@Test
void commandFailureKeepsRawReply() throws Exception {
TestGatewayService service = new TestGatewayService() {
@Override
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
responseObserver.onNext(MxCommandReply.newBuilder()
.setSessionId(request.getSessionId())
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
.setProtocolStatus(ProtocolStatus.newBuilder()
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE)
.setMessage("MXAccess rejected the write."))
.setHresult(-2147220992)
.build());
responseObserver.onCompleted();
}
};
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
MxGatewaySession session = MxGatewaySession.forSessionId(client, "failure-session");
MxAccessException error = assertThrows(
MxAccessException.class,
() -> session.write(1, 2, MxValues.int32Value(123), 0));
assertNotNull(error.reply());
assertEquals(-2147220992, error.reply().getHresult());
}
}
private static ProtocolStatus ok() {
return ProtocolStatus.newBuilder()
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
.build();
}
private abstract static class TestGatewayService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase {
@Override
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
responseObserver.onNext(OpenSessionReply.newBuilder()
.setSessionId("session-java")
.setProtocolStatus(ok())
.build());
responseObserver.onCompleted();
}
@Override
public void closeSession(CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
responseObserver.onNext(CloseSessionReply.newBuilder()
.setSessionId(request.getSessionId())
.setFinalState(SessionState.SESSION_STATE_CLOSED)
.setProtocolStatus(ok())
.build());
responseObserver.onCompleted();
}
}
private record InProcessGateway(Server server, ManagedChannel channel) implements AutoCloseable {
static InProcessGateway start(
MxAccessGatewayGrpc.MxAccessGatewayImplBase service, AtomicReference<String> authorization)
throws Exception {
String serverName = "mxgw-java-" + UUID.randomUUID();
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
authorization.set(headers.get(MxGatewayAuthInterceptor.AUTHORIZATION_HEADER));
return next.startCall(call, headers);
}
};
Server server = InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(io.grpc.ServerInterceptors.intercept(service, interceptor))
.build()
.start();
ManagedChannel channel = InProcessChannelBuilder.forName(serverName)
.directExecutor()
.build();
return new InProcessGateway(server, channel);
}
MxGatewayClient client(String apiKey, Duration callTimeout) {
return new MxGatewayClient(
channel,
MxGatewayClientOptions.builder()
.endpoint("in-process")
.apiKey(apiKey)
.plaintext(true)
.callTimeout(callTimeout)
.build());
}
@Override
public void close() {
channel.shutdownNow();
server.shutdownNow();
}
}
}
@@ -0,0 +1,136 @@
package com.dohertylan.mxgateway.client;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.util.JsonFormat;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxStatusCategory;
import mxaccess_gateway.v1.MxaccessGateway.MxStatusProxy;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import org.junit.jupiter.api.Test;
final class MxGatewayFixtureTests {
@Test
void valueFixtureCasesExposeNativeProjectionAndRawMetadata() throws Exception {
JsonArray cases = readFixture("values/value-conversion-cases.json").getAsJsonArray("cases");
for (var element : cases) {
JsonObject testCase = element.getAsJsonObject();
MxValue.Builder builder = MxValue.newBuilder();
JsonFormat.parser().merge(testCase.getAsJsonObject("value").toString(), builder);
MxValue value = builder.build();
assertEquals(testCase.get("expectedKind").getAsString(), lowerCamelKind(value));
if ("timestamp.utc".equals(testCase.get("id").getAsString())) {
assertEquals(Instant.parse("2026-01-01T00:00:04Z"), MxValues.nativeValue(value));
}
if ("string-array".equals(testCase.get("id").getAsString())) {
assertEquals(List.of("alpha", "beta"), MxValues.nativeValue(value));
}
if ("raw-fallback.variant".equals(testCase.get("id").getAsString())) {
assertEquals("No lossless typed projection exists for this VARIANT.", value.getRawDiagnostic());
assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, (byte[]) MxValues.nativeValue(value));
}
}
}
@Test
void statusFixtureCasesPreserveRawFields() throws Exception {
JsonArray cases = readFixture("statuses/status-conversion-cases.json").getAsJsonArray("cases");
for (var element : cases) {
JsonObject testCase = element.getAsJsonObject();
MxStatusProxy.Builder builder = MxStatusProxy.newBuilder();
JsonFormat.parser().merge(testCase.getAsJsonObject("status").toString(), builder);
MxStatusProxy status = builder.build();
assertEquals(
testCase.getAsJsonObject("status").get("rawCategory").getAsInt(),
status.getRawCategory());
if ("ok.responding-lmx".equals(testCase.get("id").getAsString())) {
assertTrue(MxStatuses.succeeded(status));
}
if ("security-error.requesting-lmx".equals(testCase.get("id").getAsString())) {
assertFalse(MxStatuses.succeeded(status));
assertEquals(MxStatusCategory.MX_STATUS_CATEGORY_SECURITY_ERROR, MxStatuses.view(status).category());
}
}
}
@Test
void mxAccessFailureFixtureMapsToRichCommandException() throws Exception {
MxCommandReply.Builder builder = MxCommandReply.newBuilder();
JsonFormat.parser().merge(
Files.readString(fixtureRoot().resolve("command-replies/write.mxaccess-failure.reply.json")),
builder);
MxCommandReply reply = builder.build();
try {
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
} catch (MxAccessException error) {
assertEquals(ProtocolStatusCode.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE, error.protocolStatus().getCode());
assertEquals(-2147220992, error.reply().getHresult());
assertEquals(2, error.reply().getStatusesCount());
return;
}
throw new AssertionError("expected MxAccessException");
}
@Test
void grpcAuthErrorsAreClassifiedAndRedacted() {
RuntimeException authError = MxGatewayErrors.fromGrpc(
"open session",
new io.grpc.StatusRuntimeException(io.grpc.Status.UNAUTHENTICATED.withDescription(
"invalid API key mxgw_visible_secret")));
RuntimeException permissionError = MxGatewayErrors.fromGrpc(
"write",
new io.grpc.StatusRuntimeException(io.grpc.Status.PERMISSION_DENIED.withDescription(
"missing scope mxaccess.write")));
assertInstanceOf(MxGatewayAuthenticationException.class, authError);
assertInstanceOf(MxGatewayAuthorizationException.class, permissionError);
assertTrue(authError.getMessage().contains("<redacted>"));
assertFalse(authError.getMessage().contains("visible_secret"));
}
private static JsonObject readFixture(String relativePath) throws Exception {
return JsonParser.parseString(Files.readString(fixtureRoot().resolve(relativePath))).getAsJsonObject();
}
private static Path fixtureRoot() {
Path current = Path.of(System.getProperty("user.dir")).toAbsolutePath();
for (Path path = current; path != null; path = path.getParent()) {
Path candidate = path.resolve("clients/proto/fixtures/behavior");
if (Files.exists(candidate)) {
return candidate;
}
candidate = path.resolve("../proto/fixtures/behavior").normalize();
if (Files.exists(candidate)) {
return candidate;
}
}
throw new IllegalStateException("could not locate behavior fixtures from " + current);
}
private static String lowerCamelKind(MxValue value) {
String[] parts = value.getKindCase().name().toLowerCase().split("_");
StringBuilder result = new StringBuilder(parts[0]);
for (int index = 1; index < parts.length; index++) {
result.append(Character.toUpperCase(parts[index].charAt(0))).append(parts[index].substring(1));
}
return result.toString();
}
}
@@ -0,0 +1,469 @@
{
"schemaVersion": 1,
"fixtureSet": "mxaccess-gateway-parity-fixture-matrix",
"contractName": "mxaccess-gateway",
"gatewayProtocolVersion": 1,
"workerProtocolVersion": 1,
"sourceCaptureRoot": "C:/Users/dohertj2/Desktop/mxaccess/captures",
"sourceDocs": [
"C:/Users/dohertj2/Desktop/mxaccess/docs/MXAccess-Public-API.md",
"C:/Users/dohertj2/Desktop/mxaccess/docs/Current-Sprint-State.md"
],
"comparisonFormat": {
"description": "Each parity run records the same command against direct MXAccess and the gateway-backed worker, then compares raw parity fields instead of client wrapper behavior.",
"directMxAccess": {
"requiredFields": [
"method",
"arguments",
"returnedValue",
"hresult",
"exceptionType",
"statuses",
"events"
]
},
"gatewayResult": {
"requiredFields": [
"kind",
"protocolStatus",
"returnValue",
"hresult",
"statuses",
"diagnosticMessage",
"events"
]
},
"eventFields": [
"family",
"serverHandle",
"itemHandle",
"value",
"quality",
"sourceTimestamp",
"statuses",
"workerSequence",
"workerTimestamp",
"gatewayReceiveTimestamp",
"hresult",
"rawStatus"
],
"comparisonKeys": [
"hresult",
"exceptionType",
"returnedValue",
"statusArrayShape",
"statusRawFields",
"eventFamilyOrder",
"eventPayloadShape",
"valueProjection",
"rawFallbackMetadata"
]
},
"methodFixtures": [
{
"id": "method.register.basic",
"method": "Register",
"commandKind": "MX_COMMAND_KIND_REGISTER",
"status": "planned_fixture",
"captureReferences": [
"captures/001-register/harness.log",
"captures/047-frida-com-proxy-register/harness.log"
],
"assertions": [
"preserve returned server handle in returnValue and RegisterReply",
"preserve success HRESULT as 0",
"do not emit MXAccess events for register"
]
},
{
"id": "method.unregister.basic",
"method": "Unregister",
"commandKind": "MX_COMMAND_KIND_UNREGISTER",
"status": "planned_fixture",
"captureReferences": [
"captures/001-register/harness.log",
"captures/109-native-post-remove-errors/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve HRESULT or COM exception details for invalid server handle",
"close registered handle only after MXAccess succeeds"
]
},
{
"id": "method.add-item.scalar",
"method": "AddItem",
"commandKind": "MX_COMMAND_KIND_ADD_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/002-add-remove-scalar/harness.log",
"captures/006-add-invalid/harness.log"
],
"assertions": [
"preserve returned item handle in returnValue and AddItemReply",
"preserve invalid item reference HRESULT/status details",
"do not prevalidate item definition in the gateway"
]
},
{
"id": "method.add-item2.context",
"method": "AddItem2",
"commandKind": "MX_COMMAND_KIND_ADD_ITEM2",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-additem2-testint-context.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
],
"assertions": [
"pass item_definition and item_context exactly as supplied",
"preserve returned item handle in returnValue and AddItem2Reply",
"compare context-bearing reference resolution against direct MXAccess"
]
},
{
"id": "method.remove-item.basic",
"method": "RemoveItem",
"commandKind": "MX_COMMAND_KIND_REMOVE_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/002-add-remove-scalar/harness.log",
"captures/109-native-post-remove-errors/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve post-remove and invalid-handle HRESULT/status behavior",
"remove diagnostic handle state only after MXAccess succeeds"
]
},
{
"id": "method.advise.supervisory-data-change",
"method": "Advise",
"commandKind": "MX_COMMAND_KIND_ADVISE",
"status": "planned_fixture",
"captureReferences": [
"captures/003-subscribe-scalars/harness.log",
"captures/058-frida-subscribe-testint/harness.log"
],
"assertions": [
"preserve successful command reply shape",
"forward OnDataChange with value, quality, timestamp, and status array",
"preserve per-worker event order"
]
},
{
"id": "method.unadvise.basic",
"method": "UnAdvise",
"commandKind": "MX_COMMAND_KIND_UN_ADVISE",
"status": "planned_fixture",
"captureReferences": [
"captures/058-frida-subscribe-testint/harness.log",
"captures/007-subscribe-invalid/harness.log"
],
"assertions": [
"preserve void return shape with explicit protocol success",
"preserve invalid item handle HRESULT/status behavior",
"do not distinguish plain and supervisory cleanup beyond MXAccess behavior"
]
},
{
"id": "method.advise-supervisory.basic",
"method": "AdviseSupervisory",
"commandKind": "MX_COMMAND_KIND_ADVISE_SUPERVISORY",
"status": "planned_fixture",
"captureReferences": [
"captures/058-frida-subscribe-testint/harness.log",
"captures/105-frida-advise-shortdesc-prebound-fixed/harness.log"
],
"assertions": [
"keep AdviseSupervisory distinct from plain Advise in command kind",
"forward native OnDataChange only when MXAccess emits it",
"compare supervisory item status arrays without normalization"
]
},
{
"id": "method.add-buffered-item.context",
"method": "AddBufferedItem",
"commandKind": "MX_COMMAND_KIND_ADD_BUFFERED_ITEM",
"status": "planned_fixture",
"captureReferences": [
"captures/079-frida-add-buffered-advise-testint/harness.log",
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
],
"assertions": [
"pass item_definition and item_context exactly as supplied",
"preserve returned buffered item handle in returnValue and AddBufferedItemReply",
"keep buffered registration distinct from normal AddItem2"
]
},
{
"id": "method.set-buffered-update-interval.basic",
"method": "SetBufferedUpdateInterval",
"commandKind": "MX_COMMAND_KIND_SET_BUFFERED_UPDATE_INTERVAL",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-set-buffered-interval-1000.log",
"captures/079-frida-add-buffered-advise-testint/harness.log"
],
"assertions": [
"preserve requested update interval without clamping in the gateway",
"preserve void return shape with explicit protocol success",
"compare buffered event cadence only in opt-in live runs"
]
},
{
"id": "method.suspend.scan-state",
"method": "Suspend",
"commandKind": "MX_COMMAND_KIND_SUSPEND",
"status": "planned_fixture",
"captureReferences": [
"captures/077-frida-suspend-advised-scanstate/harness.log",
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
],
"assertions": [
"preserve out MxStatus in SuspendReply and repeated statuses",
"preserve HRESULT separately from status detail",
"do not synthesize OperationComplete if native MXAccess does not raise it"
]
},
{
"id": "method.activate.scan-state",
"method": "Activate",
"commandKind": "MX_COMMAND_KIND_ACTIVATE",
"status": "planned_fixture",
"captureReferences": [
"captures/078-frida-activate-advised-scanstate/harness.log",
"captures/119-frida-activate-advised-scanstate-long/harness.log"
],
"assertions": [
"preserve out MxStatus in ActivateReply and repeated statuses",
"preserve HRESULT separately from status detail",
"do not synthesize OperationComplete if native MXAccess does not raise it"
]
},
{
"id": "method.write.value-status-matrix",
"method": "Write",
"commandKind": "MX_COMMAND_KIND_WRITE",
"status": "planned_fixture",
"captureReferences": [
"captures/023-frida-write-test-int-sequence-109-111/harness.log",
"captures/024-frida-write-test-bool-sequence/harness.log",
"captures/089-frida-write-testint-wrong-type/harness.log",
"captures/090-frida-write-invalid-reference/harness.log",
"captures/107-native-write-testint-current/harness.log"
],
"assertions": [
"preserve scalar and array value projections plus raw fallback metadata",
"preserve wrong-type and invalid-reference HRESULT/status arrays",
"forward OnWriteComplete only when native MXAccess emits it"
]
},
{
"id": "method.write2.timestamped",
"method": "Write2",
"commandKind": "MX_COMMAND_KIND_WRITE2",
"status": "planned_fixture",
"captureReferences": [
"captures/042-frida-write2-test-int-timestamp/harness.log",
"captures/066-frida-write2-test-bool-timestamp/harness.log",
"captures/075-frida-write2-test-datetime-array-timestamp/harness.log"
],
"assertions": [
"preserve timestamp_value as an MXAccess VARIANT projection",
"preserve write value shape and HRESULT/status arrays",
"compare timestamped write completion events against direct MXAccess"
]
},
{
"id": "method.write-secured.rejection-gap",
"method": "WriteSecured",
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED",
"status": "documented_gap",
"captureReferences": [
"captures/036-frida-write-secured-test-int/harness.log",
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
"captures/112-frida-write-secured-auth-verified-protectedvalue1/harness.log"
],
"assertions": [
"preserve observed 0x80004021 rejection before a value-bearing NMX body",
"preserve current_user_id and verifier_user_id only as command inputs, not logs",
"upgrade this gap to planned_fixture when a successful direct WriteSecured path is observed"
]
},
{
"id": "method.write-secured2.authenticated",
"method": "WriteSecured2",
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED2",
"status": "planned_fixture",
"captureReferences": [
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
"captures/116-frida-write-secured2-auth-verified-protectedvalue1/harness.log",
"captures/117-frida-write-secured2-auth-testint/harness.log"
],
"assertions": [
"preserve authenticated timestamped secured write body shape",
"preserve HRESULT/status arrays without logging credential-bearing values",
"do not synthesize OnWriteComplete when direct MXAccess does not emit it"
]
},
{
"id": "method.authenticate-user.basic",
"method": "AuthenticateUser",
"commandKind": "MX_COMMAND_KIND_AUTHENTICATE_USER",
"status": "planned_fixture",
"captureReferences": [
"captures/087-frida-authenticate-administrator-empty/harness.log",
"captures/088-frida-authenticate-invalid-empty/harness.log"
],
"assertions": [
"preserve returned user id in returnValue and AuthenticateUserReply",
"preserve invalid credential HRESULT/status behavior",
"redact verify_user_password from logs and diagnostics"
]
},
{
"id": "method.archestra-user-to-id.basic",
"method": "ArchestrAUserToId",
"commandKind": "MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID",
"status": "planned_fixture",
"captureReferences": [
"captures/mxaccess-user-map-administrator.log",
"captures/mxaccess-user-map-invalid.log"
],
"assertions": [
"preserve returned user id in returnValue and ArchestrAUserToIdReply",
"preserve invalid user GUID HRESULT/status behavior",
"compare raw mapping behavior without normalizing unknown users"
]
}
],
"eventFixtures": [
{
"id": "event.on-data-change.scalar",
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
"status": "planned_fixture",
"captureReferences": [
"captures/003-subscribe-scalars/harness.log",
"captures/106-native-subscribe-testint-current/harness.log"
],
"assertions": [
"preserve value, quality, timestamp, status array, and worker sequence"
]
},
{
"id": "event.on-write-complete.status",
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"status": "planned_fixture",
"captureReferences": [
"captures/008-write-test-int-same-value/harness.log",
"captures/107-native-write-testint-current/harness.log"
],
"assertions": [
"preserve write-complete status array and optional HRESULT"
]
},
{
"id": "event.operation-complete.native-trigger-gap",
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
"status": "documented_gap",
"captureReferences": [
"captures/077-frida-suspend-advised-scanstate/harness.log",
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
],
"assertions": [
"do not synthesize OperationComplete from Write or OnWriteComplete",
"upgrade this gap when a public MXAccess trigger emits event family 3"
]
},
{
"id": "event.on-buffered-data-change.batch-gap",
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
"status": "documented_gap",
"captureReferences": [
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
],
"assertions": [
"preserve raw buffered metadata until a public multi-sample event payload is observed",
"upgrade this gap when OnBufferedDataChange batches are captured from MXAccess"
]
}
],
"scenarioGroups": [
{
"id": "invalid_handles",
"description": "Invalid server, item, post-remove, and invalid-reference cases keep MXAccess-owned HRESULT and status behavior.",
"fixtureIds": [
"method.add-item.scalar",
"method.remove-item.basic",
"method.unadvise.basic",
"method.write.value-status-matrix",
"method.unregister.basic"
],
"captureReferences": [
"captures/006-add-invalid/harness.log",
"captures/007-subscribe-invalid/harness.log",
"captures/109-native-post-remove-errors/harness.log",
"captures/110-native-invalid-handle-errors/harness.log"
]
},
{
"id": "write_statuses",
"description": "Write success, wrong type, invalid reference, scalar arrays, and completion-status cases compare HRESULT, status array, value projection, and event shape.",
"fixtureIds": [
"method.write.value-status-matrix",
"method.write2.timestamped",
"event.on-write-complete.status"
],
"captureReferences": [
"captures/089-frida-write-testint-wrong-type/harness.log",
"captures/090-frida-write-invalid-reference/harness.log",
"captures/091-frida-write-testint-double-type/harness.log",
"captures/097-frida-write-bool-array-pattern/harness.log",
"captures/107-native-write-testint-current/harness.log"
]
},
{
"id": "secured_writes",
"description": "Secured writes include observed WriteSecured rejection and authenticated WriteSecured2 success paths without logging credential-bearing values.",
"fixtureIds": [
"method.write-secured.rejection-gap",
"method.write-secured2.authenticated",
"method.authenticate-user.basic"
],
"captureReferences": [
"captures/036-frida-write-secured-test-int/harness.log",
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
"captures/117-frida-write-secured2-auth-testint/harness.log"
]
},
{
"id": "add_item_context",
"description": "Context-bearing item registration compares AddItem2 and buffered AddBufferedItem argument preservation.",
"fixtureIds": [
"method.add-item2.context",
"method.add-buffered-item.context"
],
"captureReferences": [
"captures/mxaccess-additem2-testint-context.log",
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
]
},
{
"id": "buffered_registration",
"description": "Buffered registration and interval setup are tracked separately from normal advice until a public buffered data-change batch is captured.",
"fixtureIds": [
"method.add-buffered-item.context",
"method.set-buffered-update-interval.basic",
"event.on-buffered-data-change.batch-gap"
],
"captureReferences": [
"captures/079-frida-add-buffered-advise-testint/harness.log",
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
]
}
]
}
+56 -6
View File
@@ -1,8 +1,8 @@
# Python Client
The Python client package contains generated MXAccess Gateway protobuf
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
scaffold. The package uses the shared proto inputs documented in
bindings, the async `mxgateway` package, and the `mxgw-py` test CLI. The
package uses the shared proto inputs documented in
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
sync.
@@ -43,15 +43,65 @@ python -m pytest
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
```
The scaffold tests import the generated gateway and worker stubs and exercise
the deterministic CLI version output.
The tests import the generated gateway and worker stubs, run fake async gateway
stubs, verify API key metadata, exercise stream cancellation, load shared value
and command fixtures, and check deterministic CLI output.
## Library Usage
The library is async-first:
```python
from mxgateway import GatewayClient
async with await GatewayClient.connect(
endpoint="localhost:5000",
api_key="mxgw_example",
plaintext=True,
) as client:
session = await client.open_session(client_session_name="python-client")
try:
server_handle = await session.register("python-client")
item_handle = await session.add_item(server_handle, "Object.Attribute")
await session.advise(server_handle, item_handle)
finally:
await session.close()
```
`GatewayClient.open_session_raw`, `GatewayClient.invoke_raw`, and
`GatewayClient.stream_events_raw` keep the generated protobuf replies and
events available for parity tests. `Session` helpers call the method-specific
MXAccess commands and preserve raw replies on typed command exceptions.
Canceling a Python task cancels the client-side gRPC call or stream wait. It
does not abort an in-flight MXAccess COM call inside the worker process.
## Authentication And TLS
`ClientOptions.api_key` adds this metadata to unary calls and streams:
```text
authorization: Bearer <api-key>
```
The client supports plaintext channels for local development, TLS with system
roots, TLS with a custom `ca_file`, and an optional test server name override.
API keys are redacted from option repr output and CLI error output.
## CLI
The scaffold CLI exposes version information:
The CLI emits deterministic JSON for automation:
```powershell
mxgw-py version --json
mxgw-py open-session --endpoint localhost:5000 --plaintext --json
mxgw-py register --session-id <id> --client-name python-client --json
mxgw-py add-item --session-id <id> --server-handle 1 --item Object.Attribute --json
mxgw-py advise --session-id <id> --server-handle 1 --item-handle 2 --json
mxgw-py stream-events --session-id <id> --max-events 1 --json
mxgw-py write --session-id <id> --server-handle 1 --item-handle 2 --type int32 --value 123 --json
```
Additional commands are implemented with the async client/session wrapper work.
Use `--api-key` or `--api-key-env MXGATEWAY_API_KEY` to attach API key
metadata. `smoke` opens a session, registers, adds an item, advises, streams a
bounded event count, and closes the session in a `finally` block.
+34 -1
View File
@@ -1,5 +1,38 @@
"""MXAccess Gateway Python client package."""
from .auth import ApiKey, auth_metadata
from .client import GatewayClient
from .errors import (
MxAccessError,
MxGatewayAuthenticationError,
MxGatewayAuthorizationError,
MxGatewayCommandError,
MxGatewayError,
MxGatewaySessionError,
MxGatewayTransportError,
MxGatewayWorkerError,
)
from .options import ClientOptions
from .session import Session
from .values import MxValueView, from_mx_value, to_mx_value
from .version import __version__
__all__ = ["__version__"]
__all__ = [
"ApiKey",
"ClientOptions",
"GatewayClient",
"MxAccessError",
"MxGatewayAuthenticationError",
"MxGatewayAuthorizationError",
"MxGatewayCommandError",
"MxGatewayError",
"MxGatewaySessionError",
"MxGatewayTransportError",
"MxGatewayWorkerError",
"MxValueView",
"Session",
"__version__",
"auth_metadata",
"from_mx_value",
"to_mx_value",
]
+58
View File
@@ -0,0 +1,58 @@
"""Authentication metadata helpers for MXAccess Gateway clients."""
from collections.abc import Sequence
from dataclasses import dataclass
AUTHORIZATION_HEADER = "authorization"
REDACTED = "[redacted]"
@dataclass(frozen=True)
class ApiKey:
"""API key wrapper that avoids leaking the secret through repr output."""
value: str
def __post_init__(self) -> None:
if not self.value:
raise ValueError("api_key must not be empty")
def __repr__(self) -> str:
return f"{type(self).__name__}({REDACTED!r})"
def bearer_value(self) -> str:
return f"Bearer {self.value}"
def auth_metadata(api_key: str | ApiKey | None) -> tuple[tuple[str, str], ...]:
"""Return gRPC metadata for API key auth."""
if api_key is None:
return ()
key = api_key.value if isinstance(api_key, ApiKey) else api_key
if not key:
return ()
return ((AUTHORIZATION_HEADER, f"Bearer {key}"),)
def merge_metadata(
api_key: str | ApiKey | None,
metadata: Sequence[tuple[str, str]] | None = None,
) -> tuple[tuple[str, str], ...]:
"""Merge caller metadata with API key metadata."""
merged = list(metadata or ())
merged.extend(auth_metadata(api_key))
return tuple(merged)
def redact_secret(text: str, secrets: Sequence[str | None]) -> str:
"""Replace known secret values with a stable redaction marker."""
redacted = text
for secret in secrets:
if secret:
redacted = redacted.replace(secret, REDACTED)
return redacted
+165
View File
@@ -0,0 +1,165 @@
"""Async MXAccess Gateway client wrapper."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Sequence
from typing import Any
import grpc
from .auth import merge_metadata
from .errors import ensure_protocol_success, map_rpc_error
from .generated import mxaccess_gateway_pb2 as pb
from .generated import mxaccess_gateway_pb2_grpc as pb_grpc
from .options import ClientOptions, create_channel
class GatewayClient:
"""Async client for the public MXAccess Gateway gRPC API."""
def __init__(
self,
*,
options: ClientOptions,
stub: Any,
channel: grpc.aio.Channel | None = None,
) -> None:
self.options = options
self.raw_stub = stub
self._channel = channel
self._closed = False
@classmethod
async def connect(
cls,
options: ClientOptions | None = None,
*,
endpoint: str | None = None,
api_key: str | None = None,
plaintext: bool = False,
ca_file: str | None = None,
server_name_override: str | None = None,
stub: Any | None = None,
) -> "GatewayClient":
"""Create a client with either a real async channel or a supplied fake stub."""
resolved = options or ClientOptions(
endpoint=endpoint or "",
api_key=api_key,
plaintext=plaintext,
ca_file=ca_file,
server_name_override=server_name_override,
)
if stub is not None:
return cls(options=resolved, stub=stub)
channel = create_channel(resolved)
return cls(
options=resolved,
stub=pb_grpc.MxAccessGatewayStub(channel),
channel=channel,
)
async def __aenter__(self) -> "GatewayClient":
return self
async def __aexit__(self, *_exc_info: object) -> None:
await self.close()
async def close(self) -> None:
"""Close the owned gRPC channel."""
if self._closed:
return
self._closed = True
if self._channel is not None:
await self._channel.close()
async def open_session(
self,
request: pb.OpenSessionRequest | None = None,
*,
requested_backend: str = "",
client_session_name: str = "",
client_correlation_id: str = "",
) -> "Session":
"""Open a gateway session and return a high-level session wrapper."""
from .session import Session
raw_request = request or pb.OpenSessionRequest(
requested_backend=requested_backend,
client_session_name=client_session_name,
client_correlation_id=client_correlation_id,
)
reply = await self.open_session_raw(raw_request)
return Session(client=self, session_id=reply.session_id, open_reply=reply)
async def open_session_raw(self, request: pb.OpenSessionRequest) -> pb.OpenSessionReply:
reply = await self._unary("open session", self.raw_stub.OpenSession, request)
ensure_protocol_success("open session", reply.protocol_status, reply)
return reply
async def close_session_raw(
self,
request: pb.CloseSessionRequest,
) -> pb.CloseSessionReply:
reply = await self._unary("close session", self.raw_stub.CloseSession, request)
ensure_protocol_success("close session", reply.protocol_status, reply)
return reply
async def invoke_raw(self, request: pb.MxCommandRequest) -> pb.MxCommandReply:
reply = await self._unary("invoke", self.raw_stub.Invoke, request)
ensure_protocol_success("invoke", reply.protocol_status, reply)
return reply
def stream_events_raw(
self,
request: pb.StreamEventsRequest,
*,
metadata: Sequence[tuple[str, str]] | None = None,
) -> AsyncIterator[pb.MxEvent]:
"""Return an async event iterator and cancel the stream when iteration stops."""
call = self.raw_stub.StreamEvents(
request,
metadata=merge_metadata(self.options.api_key, metadata),
)
return _canceling_iterator(call)
async def _unary(
self,
operation: str,
method: Any,
request: Any,
*,
metadata: Sequence[tuple[str, str]] | None = None,
) -> Any:
call = method(
request,
metadata=merge_metadata(self.options.api_key, metadata),
)
try:
return await call
except asyncio.CancelledError:
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
raise
except grpc.RpcError as error:
raise map_rpc_error(operation, error) from error
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
try:
async for event in call:
yield event
except grpc.RpcError as error:
raise map_rpc_error("stream events", error) from error
finally:
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
+157
View File
@@ -0,0 +1,157 @@
"""Typed exception model for MXAccess Gateway Python clients."""
from __future__ import annotations
from typing import Any
import grpc
from .generated import mxaccess_gateway_pb2 as pb
class MxGatewayError(Exception):
"""Base class for client wrapper errors."""
def __init__(
self,
message: str,
*,
protocol_status: pb.ProtocolStatus | None = None,
raw_reply: Any | None = None,
) -> None:
super().__init__(message)
self.protocol_status = protocol_status
self.raw_reply = raw_reply
class MxGatewayTransportError(MxGatewayError):
"""Transport-level gRPC failure."""
class MxGatewayAuthenticationError(MxGatewayTransportError):
"""Authentication failure reported by gRPC."""
class MxGatewayAuthorizationError(MxGatewayTransportError):
"""Authorization failure reported by gRPC."""
class MxGatewaySessionError(MxGatewayError):
"""Gateway session failure."""
class MxGatewayWorkerError(MxGatewayError):
"""Gateway worker process or protocol failure."""
class MxGatewayCommandError(MxGatewayError):
"""Command failure that preserves the raw protobuf reply."""
class MxAccessError(MxGatewayCommandError):
"""MXAccess HRESULT or status failure."""
def map_rpc_error(operation: str, error: grpc.RpcError) -> MxGatewayTransportError:
"""Map a generated gRPC exception to the client exception hierarchy."""
code = error.code() if hasattr(error, "code") else None
details = error.details() if hasattr(error, "details") else str(error)
message = f"{operation} failed: {details}"
if code == grpc.StatusCode.UNAUTHENTICATED:
return MxGatewayAuthenticationError(message)
if code == grpc.StatusCode.PERMISSION_DENIED:
return MxGatewayAuthorizationError(message)
return MxGatewayTransportError(message)
def ensure_protocol_success(
operation: str,
protocol_status: pb.ProtocolStatus | None,
raw_reply: Any | None = None,
) -> Any | None:
"""Raise typed gateway errors for non-OK protocol statuses."""
code = (
protocol_status.code
if protocol_status is not None
else pb.PROTOCOL_STATUS_CODE_UNSPECIFIED
)
if code in (
pb.PROTOCOL_STATUS_CODE_OK,
pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
):
return raw_reply
message_text = protocol_status.message if protocol_status else ""
message = f"{operation} failed: {message_text or pb.ProtocolStatusCode.Name(code)}"
if code in (
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_READY,
):
raise MxGatewaySessionError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
if code in (
pb.PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE,
pb.PROTOCOL_STATUS_CODE_TIMEOUT,
pb.PROTOCOL_STATUS_CODE_CANCELED,
pb.PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION,
):
raise MxGatewayWorkerError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
raise MxGatewayCommandError(
message,
protocol_status=protocol_status,
raw_reply=raw_reply,
)
def ensure_mxaccess_success(operation: str, reply: pb.MxCommandReply) -> pb.MxCommandReply:
"""Raise `MxAccessError` when MXAccess returned HRESULT or status failure."""
status = reply.protocol_status
if status.code == pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
if reply.HasField("hresult") and reply.hresult < 0:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
for mx_status in reply.statuses:
if mx_status.success == 0:
raise MxAccessError(
_mxaccess_message(operation, reply),
protocol_status=status,
raw_reply=reply,
)
return reply
def _mxaccess_message(operation: str, reply: pb.MxCommandReply) -> str:
status_text = reply.protocol_status.message or "MXAccess command failed"
hresult = reply.hresult if reply.HasField("hresult") else None
return (
f"{operation} failed: {status_text}; "
f"session={reply.session_id}; correlation={reply.correlation_id}; "
f"hresult={hresult}; statuses={len(reply.statuses)}"
)
+59
View File
@@ -0,0 +1,59 @@
"""Client connection options for the async Python wrapper."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import grpc
from .auth import REDACTED, ApiKey
@dataclass(frozen=True)
class ClientOptions:
"""Connection settings for `GatewayClient.connect`."""
endpoint: str
api_key: str | ApiKey | None = None
plaintext: bool = False
ca_file: str | None = None
server_name_override: str | None = None
def __post_init__(self) -> None:
if not self.endpoint:
raise ValueError("endpoint must not be empty")
if self.plaintext and self.ca_file:
raise ValueError("ca_file cannot be used with plaintext connections")
def __repr__(self) -> str:
api_key = REDACTED if self.api_key else None
return (
f"{type(self).__name__}(endpoint={self.endpoint!r}, "
f"api_key={api_key!r}, plaintext={self.plaintext!r}, "
f"ca_file={self.ca_file!r}, "
f"server_name_override={self.server_name_override!r})"
)
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
channel_options: list[tuple[str, str]] = []
if options.server_name_override:
channel_options.append(("grpc.ssl_target_name_override", options.server_name_override))
if options.plaintext:
return grpc.aio.insecure_channel(options.endpoint, options=channel_options)
root_certificates = None
if options.ca_file:
root_certificates = Path(options.ca_file).read_bytes()
credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
return grpc.aio.secure_channel(
options.endpoint,
credentials,
options=channel_options,
)
+209
View File
@@ -0,0 +1,209 @@
"""Async session wrapper for MXAccess Gateway commands."""
from __future__ import annotations
from collections.abc import AsyncIterator
from .errors import ensure_mxaccess_success
from .generated import mxaccess_gateway_pb2 as pb
from .values import MxValueInput, to_mx_value
class Session:
"""A single gateway-backed MXAccess session."""
def __init__(
self,
*,
client: "GatewayClient",
session_id: str,
open_reply: pb.OpenSessionReply | None = None,
) -> None:
self.client = client
self.session_id = session_id
self.open_reply = open_reply
self._closed = False
async def __aenter__(self) -> "Session":
return self
async def __aexit__(self, *_exc_info: object) -> None:
await self.close()
async def close(self, *, client_correlation_id: str = "") -> pb.CloseSessionReply:
"""Close the gateway session. Repeated calls return a local closed reply."""
if self._closed:
return pb.CloseSessionReply(
session_id=self.session_id,
final_state=pb.SESSION_STATE_CLOSED,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
)
self._closed = True
return await self.client.close_session_raw(
pb.CloseSessionRequest(
session_id=self.session_id,
client_correlation_id=client_correlation_id,
),
)
async def invoke(self, command: pb.MxCommand, *, correlation_id: str = "") -> pb.MxCommandReply:
"""Invoke a raw command and enforce gateway and MXAccess success."""
reply = await self.invoke_raw(command, correlation_id=correlation_id)
return ensure_mxaccess_success("invoke", reply)
async def invoke_raw(
self,
command: pb.MxCommand,
*,
correlation_id: str = "",
) -> pb.MxCommandReply:
"""Invoke a raw command and preserve the raw reply."""
return await self.client.invoke_raw(
pb.MxCommandRequest(
session_id=self.session_id,
client_correlation_id=correlation_id,
command=command,
),
)
async def register(self, client_name: str, *, correlation_id: str = "") -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_REGISTER,
register=pb.RegisterCommand(client_name=client_name),
),
correlation_id=correlation_id,
)
return reply.register.server_handle
async def unregister(self, server_handle: int, *, correlation_id: str = "") -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_UNREGISTER,
unregister=pb.UnregisterCommand(server_handle=server_handle),
),
correlation_id=correlation_id,
)
async def add_item(
self,
server_handle: int,
item_definition: str,
*,
correlation_id: str = "",
) -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
add_item=pb.AddItemCommand(
server_handle=server_handle,
item_definition=item_definition,
),
),
correlation_id=correlation_id,
)
return reply.add_item.item_handle
async def add_item2(
self,
server_handle: int,
item_definition: str,
item_context: str,
*,
correlation_id: str = "",
) -> int:
reply = await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADD_ITEM2,
add_item2=pb.AddItem2Command(
server_handle=server_handle,
item_definition=item_definition,
item_context=item_context,
),
),
correlation_id=correlation_id,
)
return reply.add_item2.item_handle
async def advise(
self,
server_handle: int,
item_handle: int,
*,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_ADVISE,
advise=pb.AdviseCommand(
server_handle=server_handle,
item_handle=item_handle,
),
),
correlation_id=correlation_id,
)
async def write(
self,
server_handle: int,
item_handle: int,
value: MxValueInput,
*,
user_id: int = 0,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE,
write=pb.WriteCommand(
server_handle=server_handle,
item_handle=item_handle,
value=to_mx_value(value),
user_id=user_id,
),
),
correlation_id=correlation_id,
)
async def write2(
self,
server_handle: int,
item_handle: int,
value: MxValueInput,
timestamp_value: MxValueInput,
*,
user_id: int = 0,
correlation_id: str = "",
) -> None:
await self.invoke(
pb.MxCommand(
kind=pb.MX_COMMAND_KIND_WRITE2,
write2=pb.Write2Command(
server_handle=server_handle,
item_handle=item_handle,
value=to_mx_value(value),
timestamp_value=to_mx_value(timestamp_value),
user_id=user_id,
),
),
correlation_id=correlation_id,
)
def stream_events(
self,
*,
after_worker_sequence: int = 0,
) -> AsyncIterator[pb.MxEvent]:
return self.client.stream_events_raw(
pb.StreamEventsRequest(
session_id=self.session_id,
after_worker_sequence=after_worker_sequence,
),
)
from .client import GatewayClient # noqa: E402
+234
View File
@@ -0,0 +1,234 @@
"""MXAccess value conversion helpers."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from google.protobuf.timestamp_pb2 import Timestamp
from .generated import mxaccess_gateway_pb2 as pb
MxValueInput = bool | int | float | str | datetime | bytes | None | Sequence[Any]
@dataclass(frozen=True)
class MxValueView:
"""Typed projection of a raw `MxValue` protobuf message."""
value: Any
kind: str
raw: pb.MxValue
def to_mx_value(value: MxValueInput, *, data_type: str | None = None) -> pb.MxValue:
"""Convert a Python value into the public protobuf `MxValue` union."""
if isinstance(value, pb.MxValue):
return value
if value is None:
return pb.MxValue(
data_type=pb.MX_DATA_TYPE_NO_DATA,
variant_type="VT_EMPTY",
is_null=True,
raw_data_type=pb.MX_DATA_TYPE_NO_DATA,
)
if isinstance(value, bool):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_BOOLEAN),
variant_type="VT_BOOL",
bool_value=value,
)
if isinstance(value, int):
if -(2**31) <= value <= (2**31 - 1):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
variant_type="VT_I4",
int32_value=value,
)
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
variant_type="VT_I8",
int64_value=value,
)
if isinstance(value, float):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_DOUBLE),
variant_type="VT_R8",
double_value=value,
)
if isinstance(value, str):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_STRING),
variant_type="VT_BSTR",
string_value=value,
)
if isinstance(value, datetime):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_TIME),
variant_type="VT_DATE",
timestamp_value=_timestamp_from_datetime(value),
)
if isinstance(value, bytes):
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
variant_type="VT_RECORD",
raw_value=value,
)
if isinstance(value, Sequence):
return _sequence_to_mx_value(value, data_type=data_type)
raise TypeError(f"unsupported MxValue input type: {type(value).__name__}")
def from_mx_value(value: pb.MxValue) -> MxValueView:
"""Project a protobuf `MxValue` into an idiomatic Python value."""
kind = value.WhichOneof("kind")
if kind is None:
return MxValueView(None, "none", value)
if kind == "timestamp_value":
return MxValueView(
value.timestamp_value.ToDatetime().replace(tzinfo=timezone.utc),
kind,
value,
)
if kind == "array_value":
return MxValueView(from_mx_array(value.array_value), kind, value)
return MxValueView(getattr(value, kind), kind, value)
def from_mx_array(array: pb.MxArray) -> list[Any]:
"""Project a protobuf `MxArray` into a Python list."""
kind = array.WhichOneof("values")
if kind is None:
return []
values = list(getattr(array, kind).values)
if kind == "timestamp_values":
return [
timestamp.ToDatetime().replace(tzinfo=timezone.utc)
for timestamp in values
]
return values
def _sequence_to_mx_value(
values: Sequence[Any],
*,
data_type: str | None,
) -> pb.MxValue:
sequence = list(values)
if not sequence:
return pb.MxValue(
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
array_value=pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
dimensions=[0],
),
)
first = sequence[0]
dimensions = [len(sequence)]
if all(isinstance(item, bool) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_BOOLEAN,
variant_type="VT_ARRAY|VT_BOOL",
dimensions=dimensions,
bool_values=pb.BoolArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_BOOLEAN, array_value=array)
if all(isinstance(item, int) and not isinstance(item, bool) for item in sequence):
use_int32 = all(-(2**31) <= item <= (2**31 - 1) for item in sequence)
if use_int32:
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_INTEGER,
variant_type="VT_ARRAY|VT_I4",
dimensions=dimensions,
int32_values=pb.Int32Array(values=sequence),
)
else:
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_INTEGER,
variant_type="VT_ARRAY|VT_I8",
dimensions=dimensions,
int64_values=pb.Int64Array(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_INTEGER, array_value=array)
if all(isinstance(item, float) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_DOUBLE,
variant_type="VT_ARRAY|VT_R8",
dimensions=dimensions,
double_values=pb.DoubleArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_DOUBLE, array_value=array)
if all(isinstance(item, str) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_STRING,
variant_type="VT_ARRAY|VT_BSTR",
dimensions=dimensions,
string_values=pb.StringArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_STRING, array_value=array)
if all(isinstance(item, datetime) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_TIME,
variant_type="VT_ARRAY|VT_DATE",
dimensions=dimensions,
timestamp_values=pb.TimestampArray(
values=[_timestamp_from_datetime(item) for item in sequence],
),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_TIME, array_value=array)
if all(isinstance(item, bytes) for item in sequence):
array = pb.MxArray(
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
variant_type="VT_ARRAY|VT_VARIANT",
dimensions=dimensions,
raw_values=pb.RawArray(values=sequence),
)
return pb.MxValue(data_type=pb.MX_DATA_TYPE_UNKNOWN, array_value=array)
raise TypeError(
"MxValue array inputs must use one supported element type; "
f"first element was {type(first).__name__}"
)
def _timestamp_from_datetime(value: datetime) -> Timestamp:
timestamp = Timestamp()
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
timestamp.FromDatetime(value.astimezone(timezone.utc))
return timestamp
def _data_type(name: str | None, default: int) -> int:
if name is None:
return default
return pb.MxDataType.Value(name)
+437 -2
View File
@@ -1,10 +1,24 @@
"""CLI scaffold for the MXAccess Gateway Python client."""
"""Command line interface for the MXAccess Gateway Python client."""
from __future__ import annotations
import asyncio
import json
import os
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import Any
import click
from google.protobuf.json_format import MessageToDict
from mxgateway import __version__
from mxgateway.auth import redact_secret
from mxgateway.client import GatewayClient
from mxgateway.errors import MxGatewayError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.options import ClientOptions
from mxgateway.values import MxValueInput
@click.group()
@@ -16,14 +30,435 @@ def main() -> None:
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def version(output_json: bool) -> None:
"""Print client package version information."""
payload = {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
_emit(payload, output_json=output_json, text=f"mxgw-py {__version__}")
def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
command = click.option("--endpoint", default="localhost:5000", show_default=True)(command)
command = click.option("--api-key", default=None, help="Gateway API key.")(command)
command = click.option(
"--api-key-env",
default=None,
help="Environment variable containing the gateway API key.",
)(command)
command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command)
command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command)
command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command)
command = click.option(
"--server-name-override",
default=None,
help="TLS server name override for test environments.",
)(command)
return command
@main.command("open-session")
@gateway_options
@click.option("--client-name", default="", help="Client session name.")
@click.option("--requested-backend", default="", help="Requested backend name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def open_session(**kwargs: Any) -> None:
"""Open a gateway session."""
_run(
_open_session(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("close-session")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def close_session(**kwargs: Any) -> None:
"""Close a gateway session."""
_run(
_close_session(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--message", default="ping", show_default=True, help="Ping payload.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def ping(**kwargs: Any) -> None:
"""Send a diagnostic ping command."""
_run(_ping(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--client-name", required=True, help="MXAccess client name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def register(**kwargs: Any) -> None:
"""Invoke MXAccess Register."""
_run(
_register(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("add-item")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item", required=True, help="MXAccess item definition.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def add_item(**kwargs: Any) -> None:
"""Invoke MXAccess AddItem."""
_run(
_add_item(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def advise(**kwargs: Any) -> None:
"""Invoke MXAccess Advise."""
_run(_advise(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command("stream-events")
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--after-worker-sequence", default=0, type=int, show_default=True)
@click.option("--max-events", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def stream_events(**kwargs: Any) -> None:
"""Stream a bounded number of events."""
_run(
_stream_events(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--value", required=True, help="Value to write.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write(**kwargs: Any) -> None:
"""Invoke MXAccess Write."""
_run(_write(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
@click.option("--type", "value_type", default="string", show_default=True)
@click.option("--value", required=True, help="Value to write.")
@click.option("--timestamp", required=True, help="ISO-8601 timestamp value.")
@click.option("--user-id", default=0, type=int, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def write2(**kwargs: Any) -> None:
"""Invoke MXAccess Write2."""
_run(_write2(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
@main.command()
@gateway_options
@click.option("--client-name", default="mxgw-py-smoke", show_default=True)
@click.option("--item", required=True, help="MXAccess item definition.")
@click.option("--max-events", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def smoke(**kwargs: Any) -> None:
"""Run a bounded open/register/add/advise/stream/close smoke flow."""
_run(_smoke(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
async def _open_session(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.open_session_raw(
pb.OpenSessionRequest(
requested_backend=kwargs["requested_backend"],
client_session_name=kwargs["client_name"],
client_correlation_id=kwargs["correlation_id"],
),
)
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
async def _close_session(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.close_session_raw(
pb.CloseSessionRequest(
session_id=kwargs["session_id"],
client_correlation_id=kwargs["correlation_id"],
),
)
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
async def _ping(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.invoke_raw(
pb.MxCommandRequest(
session_id=kwargs["session_id"],
command=pb.MxCommand(
kind=pb.MX_COMMAND_KIND_PING,
ping=pb.PingCommand(message=kwargs["message"]),
),
),
)
return {"kind": "ping", "rawReply": _message_dict(reply)}
async def _register(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
server_handle = await session.register(
kwargs["client_name"],
correlation_id=kwargs["correlation_id"],
)
return {"serverHandle": server_handle}
async def _add_item(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
item_handle = await session.add_item(
kwargs["server_handle"],
kwargs["item"],
correlation_id=kwargs["correlation_id"],
)
return {"itemHandle": item_handle}
async def _advise(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.advise(
kwargs["server_handle"],
kwargs["item_handle"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
events = await _collect_events(
session.stream_events(after_worker_sequence=kwargs["after_worker_sequence"]),
max_events=kwargs["max_events"],
timeout=kwargs["timeout"],
)
return {"events": [_message_dict(event) for event in events]}
async def _write(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.write(
kwargs["server_handle"],
kwargs["item_handle"],
value,
user_id=kwargs["user_id"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _write2(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
timestamp = _parse_datetime(kwargs["timestamp"])
async with await _connect(kwargs) as client:
session = _session(client, kwargs["session_id"])
await session.write2(
kwargs["server_handle"],
kwargs["item_handle"],
value,
timestamp,
user_id=kwargs["user_id"],
correlation_id=kwargs["correlation_id"],
)
return {"ok": True}
async def _smoke(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
session = await client.open_session(client_session_name=kwargs["client_name"])
closed = False
try:
server_handle = await session.register(kwargs["client_name"])
item_handle = await session.add_item(server_handle, kwargs["item"])
await session.advise(server_handle, item_handle)
events = await _collect_events(
session.stream_events(),
max_events=kwargs["max_events"],
timeout=kwargs["timeout"],
)
return {
"sessionId": session.session_id,
"serverHandle": server_handle,
"itemHandle": item_handle,
"events": [_message_dict(event) for event in events],
}
finally:
if not closed:
await session.close()
async def _connect(kwargs: dict[str, Any]) -> GatewayClient:
api_key = kwargs.get("api_key") or _api_key_from_env(kwargs.get("api_key_env"))
return await GatewayClient.connect(
ClientOptions(
endpoint=kwargs["endpoint"],
api_key=api_key,
plaintext=_use_plaintext(kwargs),
ca_file=kwargs.get("ca_file"),
server_name_override=kwargs.get("server_name_override"),
),
)
def _session(client: GatewayClient, session_id: str):
from mxgateway.session import Session
return Session(client=client, session_id=session_id)
def _use_plaintext(kwargs: dict[str, Any]) -> bool:
if kwargs.get("use_tls"):
return False
if kwargs.get("plaintext"):
return True
return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:")
def _api_key_from_env(name: str | None) -> str | None:
if not name:
return None
return os.environ.get(name)
def _secrets(kwargs: dict[str, Any]) -> list[str | None]:
return [
kwargs.get("api_key"),
_api_key_from_env(kwargs.get("api_key_env")),
]
def _run(
awaitable: Awaitable[dict[str, Any]],
*,
output_json: bool,
secrets: list[str | None],
) -> None:
try:
payload = asyncio.run(awaitable)
except MxGatewayError as error:
raise click.ClickException(redact_secret(str(error), secrets)) from error
_emit(payload, output_json=output_json)
def _emit(
payload: dict[str, Any],
*,
output_json: bool,
text: str | None = None,
) -> None:
if output_json:
click.echo(json.dumps(payload, sort_keys=True))
return
click.echo(f"mxgw-py {__version__}")
click.echo(text or json.dumps(payload, sort_keys=True))
async def _collect_events(
events: Any,
*,
max_events: int,
timeout: float,
) -> list[pb.MxEvent]:
collected: list[pb.MxEvent] = []
iterator = events.__aiter__()
try:
while len(collected) < max_events:
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
except StopAsyncIteration:
pass
finally:
close = getattr(iterator, "aclose", None)
if close is not None:
await close()
return collected
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
normalized = value_type.lower()
if normalized == "bool":
return raw_value.lower() in ("1", "true", "yes", "on")
if normalized in ("int", "int32", "int64"):
return int(raw_value)
if normalized in ("float", "double"):
return float(raw_value)
if normalized in ("time", "timestamp"):
return _parse_datetime(raw_value)
if normalized == "raw":
return raw_value.encode("utf-8")
if normalized == "string":
return raw_value
raise click.BadParameter(f"unsupported value type: {value_type}", param_hint="--type")
def _parse_datetime(raw_value: str) -> datetime:
if raw_value.endswith("Z"):
raw_value = raw_value[:-1] + "+00:00"
parsed = datetime.fromisoformat(raw_value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def _message_dict(message: Any) -> dict[str, Any]:
return MessageToDict(
message,
preserving_proto_field_name=False,
use_integers_for_enums=False,
)
+103
View File
@@ -0,0 +1,103 @@
"""Tests for auth metadata and connection options."""
import pytest
from mxgateway.auth import REDACTED, ApiKey, auth_metadata, redact_secret
from mxgateway import options as options_module
from mxgateway.options import ClientOptions, create_channel
def test_auth_metadata_adds_bearer_api_key() -> None:
assert auth_metadata("mxgw_test_secret") == (
("authorization", "Bearer mxgw_test_secret"),
)
def test_api_key_repr_is_redacted() -> None:
api_key = ApiKey("mxgw_test_secret")
assert "mxgw_test_secret" not in repr(api_key)
assert REDACTED in repr(api_key)
def test_redact_secret_replaces_known_values() -> None:
redacted = redact_secret(
"authorization failed for mxgw_test_secret",
["mxgw_test_secret"],
)
assert redacted == f"authorization failed for {REDACTED}"
def test_client_options_reject_plaintext_with_ca_file() -> None:
with pytest.raises(ValueError, match="ca_file"):
ClientOptions(
endpoint="localhost:5000",
plaintext=True,
ca_file="ca.pem",
)
def test_client_options_repr_redacts_api_key() -> None:
options = ClientOptions(endpoint="localhost:5000", api_key="mxgw_test_secret")
assert "mxgw_test_secret" not in repr(options)
assert REDACTED in repr(options)
def test_create_channel_uses_plaintext_channel(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object]] = []
def fake_insecure_channel(endpoint: str, *, options: object) -> str:
calls.append((endpoint, options))
return "plain-channel"
monkeypatch.setattr(
options_module.grpc.aio,
"insecure_channel",
fake_insecure_channel,
)
channel = create_channel(ClientOptions(endpoint="localhost:5000", plaintext=True))
assert channel == "plain-channel"
assert calls == [("localhost:5000", [])]
def test_create_channel_uses_tls_channel(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object, object]] = []
def fake_credentials(*, root_certificates: object) -> str:
assert root_certificates is None
return "creds"
def fake_secure_channel(endpoint: str, credentials: object, *, options: object) -> str:
calls.append((endpoint, credentials, options))
return "tls-channel"
monkeypatch.setattr(
options_module.grpc,
"ssl_channel_credentials",
fake_credentials,
)
monkeypatch.setattr(
options_module.grpc.aio,
"secure_channel",
fake_secure_channel,
)
channel = create_channel(
ClientOptions(
endpoint="gateway.example:5001",
server_name_override="gateway.test",
),
)
assert channel == "tls-channel"
assert calls == [
(
"gateway.example:5001",
"creds",
[("grpc.ssl_target_name_override", "gateway.test")],
),
]
+48 -1
View File
@@ -1,4 +1,4 @@
"""Tests for the Python CLI scaffold."""
"""Tests for the Python CLI."""
import json
@@ -19,3 +19,50 @@ def test_version_json_is_deterministic() -> None:
"package": "mxaccess-gateway-client",
"version": __version__,
}
def test_write_parser_rejects_unknown_value_type() -> None:
runner = CliRunner()
result = runner.invoke(
main,
[
"write",
"--session-id",
"session-1",
"--server-handle",
"12",
"--item-handle",
"34",
"--type",
"unsupported",
"--value",
"123",
"--api-key",
"mxgw_test_secret",
"--json",
],
)
assert result.exit_code != 0
assert "unsupported value type" in result.output
def test_cli_error_output_redacts_api_key() -> None:
runner = CliRunner()
result = runner.invoke(
main,
[
"open-session",
"--endpoint",
"127.0.0.1:1",
"--api-key",
"mxgw_test_secret",
"--plaintext",
"--json",
],
)
assert result.exit_code != 0
assert "mxgw_test_secret" not in result.output
+225
View File
@@ -0,0 +1,225 @@
"""Tests for the async client and session wrappers."""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from mxgateway import ClientOptions, GatewayClient, MxAccessError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
@pytest.mark.asyncio
async def test_session_helpers_send_auth_metadata_and_preserve_raw_replies() -> None:
stub = FakeGatewayStub()
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session(client_session_name="pytest")
server_handle = await session.register("pytest-client")
item_handle = await session.add_item(server_handle, "Object.Attribute")
await session.advise(server_handle, item_handle)
assert session.session_id == "session-1"
assert server_handle == 12
assert item_handle == 34
assert stub.open_session.metadata == (("authorization", "Bearer mxgw_test_secret"),)
assert stub.invoke.requests[0].command.register.client_name == "pytest-client"
assert stub.invoke.requests[1].command.add_item.item_definition == "Object.Attribute"
assert stub.invoke.requests[2].command.advise.item_handle == 34
@pytest.mark.asyncio
async def test_mxaccess_error_preserves_raw_reply() -> None:
stub = FakeGatewayStub()
failure_reply = pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_WRITE,
protocol_status=pb.ProtocolStatus(
code=pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
message="MXAccess rejected write.",
),
hresult=-1,
)
stub.invoke.replies = [failure_reply]
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
with pytest.raises(MxAccessError) as captured:
await session.write(12, 34, 123)
assert captured.value.raw_reply is failure_reply
@pytest.mark.asyncio
async def test_stream_events_cancels_underlying_call_when_closed() -> None:
stream = FakeStream(
[
pb.MxEvent(
session_id="session-1",
worker_sequence=1,
family=pb.MX_EVENT_FAMILY_ON_DATA_CHANGE,
),
],
)
stub = FakeGatewayStub(stream=stream)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
session = await client.open_session()
events = session.stream_events()
first = await anext(events)
await events.aclose()
assert first.worker_sequence == 1
assert stream.cancelled
assert stub.stream_metadata == (("authorization", "Bearer mxgw_test_secret"),)
@pytest.mark.asyncio
async def test_unary_task_cancellation_reaches_fake_call() -> None:
blocking = BlockingCancellableUnary()
stub = FakeGatewayStub()
stub.OpenSession = blocking
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub,
)
task = asyncio.create_task(client.open_session())
await blocking.started.wait()
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert blocking.call is not None
assert blocking.call.cancelled
class FakeGatewayStub:
def __init__(self, stream: "FakeStream | None" = None) -> None:
self.open_session = FakeUnary(
[
pb.OpenSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.close_session = FakeUnary(
[
pb.CloseSessionReply(
session_id="session-1",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.invoke = FakeUnary(
[
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_REGISTER,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
register=pb.RegisterReply(server_handle=12),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
add_item=pb.AddItemReply(item_handle=34),
),
pb.MxCommandReply(
session_id="session-1",
kind=pb.MX_COMMAND_KIND_ADVISE,
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
),
],
)
self.OpenSession = self.open_session
self.CloseSession = self.close_session
self.Invoke = self.invoke
self._stream = stream or FakeStream([])
self.stream_metadata: tuple[tuple[str, str], ...] | None = None
def StreamEvents(
self,
request: pb.StreamEventsRequest,
*,
metadata: tuple[tuple[str, str], ...],
) -> "FakeStream":
self.stream_request = request
self.stream_metadata = metadata
return self._stream
class FakeUnary:
def __init__(self, replies: list[Any]) -> None:
self.replies = replies
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
async def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> Any:
self.requests.append(request)
self.metadata = metadata
return self.replies.pop(0)
class BlockingCancellableUnary:
def __init__(self) -> None:
self.started = asyncio.Event()
self.call: BlockingCall | None = None
def __call__(self, *_args: Any, **_kwargs: Any) -> "BlockingCall":
self.call = BlockingCall(self.started)
return self.call
class BlockingCall:
def __init__(self, started: asyncio.Event) -> None:
self.started = started
self.cancelled = False
def __await__(self):
return self._wait().__await__()
async def _wait(self) -> Any:
self.started.set()
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
raise
def cancel(self) -> None:
self.cancelled = True
class FakeStream:
def __init__(self, events: list[pb.MxEvent]) -> None:
self._events = events
self.cancelled = False
def __aiter__(self) -> "FakeStream":
return self
async def __anext__(self) -> pb.MxEvent:
if not self._events:
await asyncio.sleep(3600)
return self._events.pop(0)
def cancel(self) -> None:
self.cancelled = True
+49
View File
@@ -0,0 +1,49 @@
"""Tests for typed command error mapping."""
import json
from pathlib import Path
import pytest
from google.protobuf.json_format import ParseDict
from mxgateway.errors import ensure_mxaccess_success, ensure_protocol_success
from mxgateway import MxAccessError, MxGatewaySessionError
from mxgateway.generated import mxaccess_gateway_pb2 as pb
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
def test_register_fixture_is_protocol_and_mxaccess_success() -> None:
reply = _load_reply("command-replies/register.ok.reply.json")
assert ensure_protocol_success("register", reply.protocol_status, reply) is reply
assert ensure_mxaccess_success("register", reply) is reply
def test_write_failure_fixture_preserves_raw_reply() -> None:
reply = _load_reply("command-replies/write.mxaccess-failure.reply.json")
assert ensure_protocol_success("write", reply.protocol_status, reply) is reply
with pytest.raises(MxAccessError) as captured:
ensure_mxaccess_success("write", reply)
assert captured.value.raw_reply is reply
assert captured.value.raw_reply.hresult == -2147220992
assert len(captured.value.raw_reply.statuses) == 2
def test_session_status_maps_to_session_error() -> None:
status = pb.ProtocolStatus(
code=pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
message="session missing",
)
with pytest.raises(MxGatewaySessionError) as captured:
ensure_protocol_success("invoke", status)
assert captured.value.protocol_status is status
def _load_reply(name: str) -> pb.MxCommandReply:
payload = json.loads((FIXTURE_ROOT / name).read_text(encoding="utf-8"))
return ParseDict(payload, pb.MxCommandReply())
+49
View File
@@ -0,0 +1,49 @@
"""Tests for MXAccess value conversion helpers."""
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from google.protobuf.json_format import ParseDict
from mxgateway.generated import mxaccess_gateway_pb2 as pb
from mxgateway.values import from_mx_value, to_mx_value
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
def test_value_conversion_fixtures_project_expected_oneof_kind() -> None:
payload = json.loads(
(FIXTURE_ROOT / "values" / "value-conversion-cases.json").read_text(
encoding="utf-8",
),
)
for case in payload["cases"]:
value = ParseDict(case["value"], pb.MxValue())
projection = from_mx_value(value)
assert projection.kind == _snake_case(case["expectedKind"])
assert projection.raw is value
def test_to_mx_value_supports_scalar_and_array_inputs() -> None:
assert to_mx_value(True).WhichOneof("kind") == "bool_value"
assert to_mx_value(12).int32_value == 12
assert to_mx_value(2**40).int64_value == 2**40
assert to_mx_value(12.5).double_value == 12.5
assert to_mx_value("abc").string_value == "abc"
assert to_mx_value([1, 2]).array_value.int32_values.values == [1, 2]
assert to_mx_value(["a", "b"]).array_value.string_values.values == ["a", "b"]
def test_to_mx_value_uses_utc_timestamps() -> None:
value = to_mx_value(datetime(2026, 1, 1, 0, 0, 4, tzinfo=timezone.utc))
assert value.data_type == pb.MX_DATA_TYPE_TIME
assert value.timestamp_value.seconds == 1767225604
def _snake_case(value: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", value).lower()
+8
View File
@@ -76,6 +76,13 @@ stdout/stderr lines emitted during the run.
## Focused Commands
Run the parity fixture matrix tests after changing the integration parity
scenario list:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
```
Run the fake worker tests after changing gateway worker IPC, session startup, or
event streaming behavior:
@@ -95,6 +102,7 @@ dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj
## Related Documentation
- [Parity Fixture Matrix](./ParityFixtureMatrix.md)
- [Gateway Process Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
+102
View File
@@ -0,0 +1,102 @@
# Parity Fixture Matrix
The parity fixture matrix defines the live-test scenarios used to compare
direct MXAccess behavior with the gateway-backed worker. It is a planning and
validation fixture, not a source of synthetic MXAccess behavior.
The matrix lives in
`clients/proto/fixtures/parity/parity-fixture-matrix.json`. It references the
local MXAccess capture set under
`C:/Users/dohertj2/Desktop/mxaccess/captures` and keeps capture paths relative
to that root so the repository does not copy raw capture artifacts.
## Scope
The matrix covers every public `LMXProxyServerClass` method represented by the
gateway contract:
- `Register`
- `Unregister`
- `AddItem`
- `AddItem2`
- `RemoveItem`
- `Advise`
- `UnAdvise`
- `AdviseSupervisory`
- `AddBufferedItem`
- `SetBufferedUpdateInterval`
- `Suspend`
- `Activate`
- `Write`
- `Write2`
- `WriteSecured`
- `WriteSecured2`
- `AuthenticateUser`
- `ArchestrAUserToId`
Each entry is either a `planned_fixture` or a `documented_gap`.
`WriteSecured` remains a documented gap because the current captures show
`0x80004021` before MXAccess emits a value-bearing write body.
`OperationComplete` and public `OnBufferedDataChange` batches also remain
documented gaps because no capture in the current set proves those public event
payloads from native MXAccess.
## Required Scenario Groups
The matrix pins the high-risk parity scenarios from the integration milestone:
| Scenario | Purpose |
|----------|---------|
| `invalid_handles` | Preserves invalid server, item, post-remove, and invalid-reference HRESULT/status behavior. |
| `write_statuses` | Compares successful writes, wrong-type writes, invalid references, arrays, and write-complete status arrays. |
| `secured_writes` | Covers observed `WriteSecured` rejection and authenticated `WriteSecured2` paths without logging credential-bearing values. |
| `add_item_context` | Ensures `AddItem2` and buffered registration pass context strings exactly as supplied. |
| `buffered_registration` | Tracks buffered item registration and interval setup separately from normal advice. |
## Comparison Format
Each live parity fixture should record one direct MXAccess result and one
gateway result for the same operation.
Direct MXAccess records include:
- method name,
- arguments after redaction,
- returned value,
- HRESULT,
- exception type,
- `MXSTATUS_PROXY[]` values,
- native event records in observed order.
Gateway records include:
- `MxCommandKind`,
- `ProtocolStatus`,
- `MxCommandReply.ReturnValue`,
- `MxCommandReply.Hresult`,
- repeated `MxCommandReply.Statuses`,
- safe diagnostic message,
- streamed `MxEvent` records in worker-sequence order.
Compare HRESULT, exception type, returned value, status array shape, raw status
fields, event family order, event payload shape, value projection, and raw
fallback metadata. The gateway must not convert an MXAccess command failure
into a transport failure when the worker captured HRESULT or status details.
## Validation
Run the parity fixture matrix tests after changing the matrix:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
```
Live MXAccess execution remains opt-in. The matrix defines which scenarios to
run when the installed MXAccess COM component and provider state are available;
normal unit tests only validate the repository fixture shape.
## Related Documentation
- [Gateway Testing](./GatewayTesting.md)
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
- [Protobuf Contracts](./Contracts.md)
@@ -0,0 +1,293 @@
using System.Text.Json;
using MxGateway.Contracts;
namespace MxGateway.Tests.Contracts;
public sealed class ParityFixtureMatrixTests
{
[Fact]
public void Matrix_DeclaresCurrentProtocolVersionsAndComparisonFields()
{
using JsonDocument matrix = LoadParityMatrix();
JsonElement root = matrix.RootElement;
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
Assert.Equal("mxaccess-gateway-parity-fixture-matrix", root.GetProperty("fixtureSet").GetString());
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
JsonElement comparisonFormat = root.GetProperty("comparisonFormat");
AssertRequiredFields(
comparisonFormat.GetProperty("directMxAccess").GetProperty("requiredFields"),
"method",
"arguments",
"returnedValue",
"hresult",
"statuses",
"events");
AssertRequiredFields(
comparisonFormat.GetProperty("gatewayResult").GetProperty("requiredFields"),
"kind",
"protocolStatus",
"returnValue",
"hresult",
"statuses",
"events");
AssertRequiredFields(
comparisonFormat.GetProperty("eventFields"),
"family",
"value",
"quality",
"sourceTimestamp",
"statuses",
"workerSequence");
AssertRequiredFields(
comparisonFormat.GetProperty("comparisonKeys"),
"hresult",
"statusArrayShape",
"statusRawFields",
"eventFamilyOrder",
"eventPayloadShape",
"valueProjection",
"rawFallbackMetadata");
}
[Fact]
public void Matrix_CoversEveryPublicMxAccessMethod()
{
using JsonDocument matrix = LoadParityMatrix();
JsonElement methodFixtures = matrix.RootElement.GetProperty("methodFixtures");
Dictionary<string, JsonElement> fixturesByMethod = [];
HashSet<string> ids = new(StringComparer.Ordinal);
foreach (JsonElement fixture in methodFixtures.EnumerateArray())
{
string id = fixture.GetProperty("id").GetString()!;
string method = fixture.GetProperty("method").GetString()!;
string commandKind = fixture.GetProperty("commandKind").GetString()!;
string status = fixture.GetProperty("status").GetString()!;
Assert.True(ids.Add(id), $"Duplicate parity fixture id '{id}'.");
Assert.True(fixturesByMethod.TryAdd(method, fixture), $"Duplicate parity method '{method}'.");
Assert.StartsWith("MX_COMMAND_KIND_", commandKind, StringComparison.Ordinal);
Assert.Contains(status, KnownFixtureStatuses);
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
}
Assert.Equal(ExpectedPublicMethods.Order(StringComparer.Ordinal), fixturesByMethod.Keys.Order(StringComparer.Ordinal));
foreach (string method in ExpectedPublicMethods)
{
JsonElement fixture = fixturesByMethod[method];
string status = fixture.GetProperty("status").GetString()!;
Assert.True(
status == "planned_fixture" || status == "documented_gap",
$"Method '{method}' must have a planned parity fixture or documented gap.");
}
}
[Fact]
public void Matrix_CoversRequiredParityScenarioGroups()
{
using JsonDocument matrix = LoadParityMatrix();
HashSet<string> knownFixtureIds = GetFixtureIds(matrix.RootElement);
Dictionary<string, JsonElement> groupsById = [];
foreach (JsonElement group in matrix.RootElement.GetProperty("scenarioGroups").EnumerateArray())
{
string id = group.GetProperty("id").GetString()!;
Assert.True(groupsById.TryAdd(id, group), $"Duplicate parity scenario group '{id}'.");
Assert.NotEmpty(group.GetProperty("description").GetString()!);
Assert.NotEmpty(group.GetProperty("fixtureIds").EnumerateArray());
AssertCaptureReferencesAreRelative(group.GetProperty("captureReferences"));
foreach (JsonElement fixtureIdElement in group.GetProperty("fixtureIds").EnumerateArray())
{
string fixtureId = fixtureIdElement.GetString()!;
Assert.Contains(fixtureId, knownFixtureIds);
}
}
foreach (string requiredGroup in RequiredScenarioGroups)
{
Assert.True(groupsById.ContainsKey(requiredGroup), $"Missing required parity scenario group '{requiredGroup}'.");
}
AssertScenarioCovers(groupsById["invalid_handles"], "method.remove-item.basic", "method.write.value-status-matrix");
AssertScenarioCovers(groupsById["write_statuses"], "method.write.value-status-matrix", "event.on-write-complete.status");
AssertScenarioCovers(groupsById["secured_writes"], "method.write-secured.rejection-gap", "method.write-secured2.authenticated");
AssertScenarioCovers(groupsById["add_item_context"], "method.add-item2.context", "method.add-buffered-item.context");
AssertScenarioCovers(groupsById["buffered_registration"], "method.add-buffered-item.context", "event.on-buffered-data-change.batch-gap");
}
[Fact]
public void Matrix_CoversEveryPublicMxAccessEventFamily()
{
using JsonDocument matrix = LoadParityMatrix();
Dictionary<string, JsonElement> fixturesByFamily = [];
foreach (JsonElement fixture in matrix.RootElement.GetProperty("eventFixtures").EnumerateArray())
{
string family = fixture.GetProperty("family").GetString()!;
string status = fixture.GetProperty("status").GetString()!;
Assert.True(fixturesByFamily.TryAdd(family, fixture), $"Duplicate parity event family '{family}'.");
Assert.Contains(status, KnownFixtureStatuses);
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
}
foreach (string eventFamily in ExpectedEventFamilies)
{
Assert.True(fixturesByFamily.ContainsKey(eventFamily), $"Missing parity fixture for event family '{eventFamily}'.");
}
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_OPERATION_COMPLETE"].GetProperty("status").GetString());
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE"].GetProperty("status").GetString());
}
private static readonly string[] ExpectedPublicMethods =
[
"Register",
"Unregister",
"AddItem",
"AddItem2",
"RemoveItem",
"Advise",
"UnAdvise",
"AdviseSupervisory",
"AddBufferedItem",
"SetBufferedUpdateInterval",
"Suspend",
"Activate",
"Write",
"Write2",
"WriteSecured",
"WriteSecured2",
"AuthenticateUser",
"ArchestrAUserToId",
];
private static readonly string[] ExpectedEventFamilies =
[
"MX_EVENT_FAMILY_ON_DATA_CHANGE",
"MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
"MX_EVENT_FAMILY_OPERATION_COMPLETE",
"MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
];
private static readonly string[] RequiredScenarioGroups =
[
"invalid_handles",
"write_statuses",
"secured_writes",
"add_item_context",
"buffered_registration",
];
private static readonly string[] KnownFixtureStatuses =
[
"planned_fixture",
"documented_gap",
];
private static void AssertRequiredFields(
JsonElement fields,
params string[] expectedFields)
{
HashSet<string> declared = fields
.EnumerateArray()
.Select(field => field.GetString()!)
.ToHashSet(StringComparer.Ordinal);
foreach (string expectedField in expectedFields)
{
Assert.Contains(expectedField, declared);
}
}
private static void AssertCaptureReferencesAreRelative(JsonElement captureReferences)
{
int count = 0;
foreach (JsonElement captureReference in captureReferences.EnumerateArray())
{
string path = captureReference.GetString()!;
Assert.StartsWith("captures/", path, StringComparison.Ordinal);
Assert.DoesNotContain("\\", path, StringComparison.Ordinal);
Assert.False(Path.IsPathRooted(path), $"Capture reference '{path}' must be relative.");
count++;
}
Assert.True(count > 0, "Each parity fixture must reference at least one MXAccess capture.");
}
private static void AssertScenarioCovers(
JsonElement group,
params string[] fixtureIds)
{
HashSet<string> declared = group
.GetProperty("fixtureIds")
.EnumerateArray()
.Select(fixtureId => fixtureId.GetString()!)
.ToHashSet(StringComparer.Ordinal);
foreach (string fixtureId in fixtureIds)
{
Assert.Contains(fixtureId, declared);
}
}
private static HashSet<string> GetFixtureIds(JsonElement root)
{
HashSet<string> ids = new(StringComparer.Ordinal);
foreach (JsonElement fixture in root.GetProperty("methodFixtures").EnumerateArray())
{
ids.Add(fixture.GetProperty("id").GetString()!);
}
foreach (JsonElement fixture in root.GetProperty("eventFixtures").EnumerateArray())
{
ids.Add(fixture.GetProperty("id").GetString()!);
}
return ids;
}
private static JsonDocument LoadParityMatrix()
{
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetParityFixtureRoot().FullName, "parity-fixture-matrix.json")));
}
private static DirectoryInfo GetParityFixtureRoot()
{
DirectoryInfo repositoryRoot = FindRepositoryRoot();
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "parity"));
}
private static DirectoryInfo FindRepositoryRoot()
{
DirectoryInfo? current = new(AppContext.BaseDirectory);
while (current is not null)
{
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
&& Directory.Exists(Path.Combine(current.FullName, "src"))
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
{
return current;
}
current = current.Parent;
}
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
}
}