Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d6939432f9 | |||
| 02143ef7e2 | |||
| c032852065 | |||
| 1d93e77234 | |||
| 0a670eb381 | |||
| b57662aae7 | |||
| 14afb325c3 | |||
| af42891d5a | |||
| 01a51df053 |
@@ -0,0 +1,96 @@
|
||||
# Java Client
|
||||
|
||||
The Java client workspace contains the MXAccess Gateway client library,
|
||||
generated protobuf/gRPC bindings, a Picocli test CLI project, and JUnit tests.
|
||||
|
||||
## Layout
|
||||
|
||||
```text
|
||||
clients/java/
|
||||
settings.gradle
|
||||
build.gradle
|
||||
src/main/generated/
|
||||
mxgateway-client/
|
||||
mxgateway-cli/
|
||||
```
|
||||
|
||||
`mxgateway-client` generates Java protobuf and gRPC sources from
|
||||
`../../src/MxGateway.Contracts/Protos`. The Gradle protobuf plugin writes those
|
||||
generated sources under `src/main/generated`, which matches the client proto
|
||||
manifest in `../proto/proto-inputs.json`. Do not edit generated files by hand.
|
||||
|
||||
`mxgateway-client` exposes `MxGatewayClientOptions`, `MxGatewayClient`,
|
||||
`MxGatewaySession`, value/status helpers, typed gateway exceptions, raw
|
||||
generated stubs, and generated protobuf messages for parity tests.
|
||||
|
||||
`mxgateway-cli` depends on `mxgateway-client` and provides the `mxgw-java`
|
||||
application entry point. The CLI supports version, session, command, event
|
||||
streaming, write, and smoke-test commands with deterministic JSON output.
|
||||
|
||||
## Client Usage
|
||||
|
||||
Create a client with explicit transport and auth options:
|
||||
|
||||
```java
|
||||
MxGatewayClientOptions options = MxGatewayClientOptions.builder()
|
||||
.endpoint("localhost:5000")
|
||||
.apiKey(System.getenv("MXGATEWAY_API_KEY"))
|
||||
.plaintext(true)
|
||||
.build();
|
||||
|
||||
try (MxGatewayClient client = MxGatewayClient.connect(options);
|
||||
MxGatewaySession session = client.openSession("java-client")) {
|
||||
int serverHandle = session.register("java-client");
|
||||
int itemHandle = session.addItem(serverHandle, "TestObject.TestInt");
|
||||
session.advise(serverHandle, itemHandle);
|
||||
session.write(serverHandle, itemHandle, MxValues.int32Value(123), 0);
|
||||
}
|
||||
```
|
||||
|
||||
Use `rawBlockingStub`, `rawFutureStub`, `rawAsyncStub`, `openSessionRaw`,
|
||||
`closeSessionRaw`, `invoke`, and raw session helper methods when tests need the
|
||||
underlying protobuf messages. `MxGatewayCommandException` and
|
||||
`MxAccessException` preserve the raw `MxCommandReply` when the gateway returns a
|
||||
data-bearing MXAccess failure.
|
||||
|
||||
`MxEventStream` implements `Iterator<MxEvent>` and `AutoCloseable`. Closing it
|
||||
cancels the underlying gRPC stream. Canceling or timing out a Java client call
|
||||
only stops the client from waiting; it does not abort an in-flight MXAccess COM
|
||||
call on the worker STA.
|
||||
|
||||
## CLI Usage
|
||||
|
||||
Run the CLI through Gradle:
|
||||
|
||||
```powershell
|
||||
gradle :mxgateway-cli:run --args="version --json"
|
||||
gradle :mxgateway-cli:run --args="open-session --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --client-session-name java-cli --json"
|
||||
gradle :mxgateway-cli:run --args="register --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --client-name java-cli --json"
|
||||
gradle :mxgateway-cli:run --args="add-item --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item TestObject.TestInt --json"
|
||||
gradle :mxgateway-cli:run --args="advise --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --json"
|
||||
gradle :mxgateway-cli:run --args="write --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --server-handle 1 --item-handle 1 --type int32 --value 123 --json"
|
||||
gradle :mxgateway-cli:run --args="stream-events --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --session-id <id> --limit 1 --json"
|
||||
gradle :mxgateway-cli:run --args="smoke --endpoint localhost:5000 --api-key-env MXGATEWAY_API_KEY --plaintext --item TestObject.TestInt --json"
|
||||
```
|
||||
|
||||
The CLI accepts `--api-key`, `--api-key-env`, `--plaintext`, `--ca-file`,
|
||||
`--server-name-override`, `--timeout`, and `--json` on gateway commands. JSON
|
||||
output redacts API keys.
|
||||
|
||||
## Build And Test
|
||||
|
||||
Run the Java checks from `clients/java`:
|
||||
|
||||
```powershell
|
||||
gradle test
|
||||
```
|
||||
|
||||
The build uses the Java 21 Gradle toolchain, compiles generated protobuf/gRPC
|
||||
code, and runs JUnit 5 tests for the client wrapper, shared behavior fixtures,
|
||||
in-process gRPC behavior, stream cancellation, and CLI parser/output behavior.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Client Proto Generation](../../docs/client-proto-generation.md)
|
||||
- [Java Client Detailed Design](../../docs/clients-java-design.md)
|
||||
- [Java Style Guide](../../docs/style-guides/JavaStyleGuide.md)
|
||||
@@ -0,0 +1,40 @@
|
||||
plugins {
|
||||
id 'base'
|
||||
}
|
||||
|
||||
ext {
|
||||
guavaVersion = '33.5.0-jre'
|
||||
gsonVersion = '2.13.2'
|
||||
grpcVersion = '1.76.0'
|
||||
junitVersion = '5.14.1'
|
||||
picocliVersion = '4.7.7'
|
||||
protobufVersion = '4.33.1'
|
||||
}
|
||||
|
||||
subprojects {
|
||||
group = 'com.dohertylan.mxgateway'
|
||||
version = '0.1.0'
|
||||
|
||||
pluginManager.withPlugin('java') {
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion = JavaLanguageVersion.of(21)
|
||||
}
|
||||
}
|
||||
|
||||
tasks.withType(JavaCompile).configureEach {
|
||||
options.encoding = 'UTF-8'
|
||||
options.release = 21
|
||||
}
|
||||
|
||||
tasks.withType(Test).configureEach {
|
||||
useJUnitPlatform()
|
||||
}
|
||||
|
||||
dependencies {
|
||||
testImplementation platform("org.junit:junit-bom:${junitVersion}")
|
||||
testImplementation 'org.junit.jupiter:junit-jupiter'
|
||||
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
plugins {
|
||||
id 'application'
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':mxgateway-client')
|
||||
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
|
||||
implementation "info.picocli:picocli:${picocliVersion}"
|
||||
}
|
||||
|
||||
application {
|
||||
mainClass = 'com.dohertylan.mxgateway.cli.MxGatewayCli'
|
||||
}
|
||||
+644
@@ -0,0 +1,644 @@
|
||||
package com.dohertylan.mxgateway.cli;
|
||||
|
||||
import com.dohertylan.mxgateway.client.MxEventStream;
|
||||
import com.dohertylan.mxgateway.client.MxGatewayClient;
|
||||
import com.dohertylan.mxgateway.client.MxGatewayClientOptions;
|
||||
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
|
||||
import com.dohertylan.mxgateway.client.MxGatewaySecrets;
|
||||
import com.dohertylan.mxgateway.client.MxGatewaySession;
|
||||
import com.dohertylan.mxgateway.client.MxValues;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Command;
|
||||
import picocli.CommandLine.Mixin;
|
||||
import picocli.CommandLine.Model.CommandSpec;
|
||||
import picocli.CommandLine.Option;
|
||||
import picocli.CommandLine.Spec;
|
||||
|
||||
@Command(
|
||||
name = "mxgw-java",
|
||||
mixinStandardHelpOptions = true,
|
||||
description = "MXAccess Gateway Java test CLI.")
|
||||
public final class MxGatewayCli implements Callable<Integer> {
|
||||
private final MxGatewayCliClientFactory clientFactory;
|
||||
|
||||
@Spec
|
||||
private CommandSpec spec;
|
||||
|
||||
public MxGatewayCli() {
|
||||
this(new GrpcMxGatewayCliClientFactory());
|
||||
}
|
||||
|
||||
MxGatewayCli(MxGatewayCliClientFactory clientFactory) {
|
||||
this.clientFactory = clientFactory;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
int exitCode = commandLine(new GrpcMxGatewayCliClientFactory()).execute(args);
|
||||
System.exit(exitCode);
|
||||
}
|
||||
|
||||
public static int execute(PrintWriter out, PrintWriter err, String... args) {
|
||||
return execute(new GrpcMxGatewayCliClientFactory(), out, err, args);
|
||||
}
|
||||
|
||||
static int execute(MxGatewayCliClientFactory clientFactory, PrintWriter out, PrintWriter err, String... args) {
|
||||
CommandLine commandLine = commandLine(clientFactory);
|
||||
commandLine.setOut(out);
|
||||
commandLine.setErr(err);
|
||||
return commandLine.execute(args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
spec.commandLine().usage(spec.commandLine().getOut());
|
||||
return 0;
|
||||
}
|
||||
|
||||
private static CommandLine commandLine(MxGatewayCliClientFactory clientFactory) {
|
||||
CommandLine commandLine = new CommandLine(new MxGatewayCli(clientFactory));
|
||||
commandLine.addSubcommand("version", new VersionCommand());
|
||||
commandLine.addSubcommand("open-session", new OpenSessionCommand(clientFactory));
|
||||
commandLine.addSubcommand("close-session", new CloseSessionCommand(clientFactory));
|
||||
commandLine.addSubcommand("register", new RegisterCommand(clientFactory));
|
||||
commandLine.addSubcommand("add-item", new AddItemCommand(clientFactory));
|
||||
commandLine.addSubcommand("advise", new AdviseCommand(clientFactory));
|
||||
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
|
||||
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
|
||||
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
|
||||
return commandLine;
|
||||
}
|
||||
|
||||
@Command(name = "version", description = "Prints the Java client version.")
|
||||
public static final class VersionCommand implements Callable<Integer> {
|
||||
@Spec
|
||||
private CommandSpec spec;
|
||||
|
||||
@Option(names = "--json", description = "Write JSON output.")
|
||||
private boolean json;
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
Map<String, Object> values = new LinkedHashMap<>();
|
||||
values.put("clientVersion", MxGatewayClientVersion.clientVersion());
|
||||
values.put("gatewayProtocolVersion", MxGatewayClientVersion.gatewayProtocolVersion());
|
||||
values.put("workerProtocolVersion", MxGatewayClientVersion.workerProtocolVersion());
|
||||
if (json) {
|
||||
spec.commandLine().getOut().println(jsonObject(values));
|
||||
return 0;
|
||||
}
|
||||
|
||||
spec.commandLine()
|
||||
.getOut()
|
||||
.printf(
|
||||
"mxgateway-java %s gatewayProtocolVersion=%d workerProtocolVersion=%d%n",
|
||||
MxGatewayClientVersion.clientVersion(),
|
||||
MxGatewayClientVersion.gatewayProtocolVersion(),
|
||||
MxGatewayClientVersion.workerProtocolVersion());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class GatewayCommand implements Callable<Integer> {
|
||||
final MxGatewayCliClientFactory clientFactory;
|
||||
|
||||
@Mixin
|
||||
CommonOptions common = new CommonOptions();
|
||||
|
||||
@Option(names = "--json", description = "Write JSON output.")
|
||||
boolean json;
|
||||
|
||||
GatewayCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
this.clientFactory = clientFactory;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "open-session", description = "Opens a gateway session.")
|
||||
static final class OpenSessionCommand extends GatewayCommand {
|
||||
@Option(names = "--client-session-name", description = "Client session name.")
|
||||
String clientSessionName = "";
|
||||
|
||||
@Option(names = "--backend", description = "Requested gateway backend.")
|
||||
String backend = "";
|
||||
|
||||
OpenSessionCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
var reply = client.openSession(OpenSessionRequest.newBuilder()
|
||||
.setClientSessionName(clientSessionName)
|
||||
.setRequestedBackend(backend)
|
||||
.build());
|
||||
writeOutput("open-session", common, json, reply, () -> reply.getSessionId());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "close-session", description = "Closes a gateway session.")
|
||||
static final class CloseSessionCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
CloseSessionCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
var reply = client.closeSession(CloseSessionRequest.newBuilder()
|
||||
.setSessionId(sessionId)
|
||||
.build());
|
||||
writeOutput("close-session", common, json, reply, () -> reply.getFinalState().name());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "register", description = "Invokes MXAccess Register.")
|
||||
static final class RegisterCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
@Option(names = "--client-name", required = true, description = "MXAccess client name.")
|
||||
String clientName;
|
||||
|
||||
RegisterCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
MxCommandReply reply = client.session(sessionId).registerRaw(clientName);
|
||||
writeOutput("register", common, json, reply, () -> reply.getKind().name());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "add-item", description = "Invokes MXAccess AddItem.")
|
||||
static final class AddItemCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||
int serverHandle;
|
||||
|
||||
@Option(names = "--item", required = true, description = "Item definition.")
|
||||
String item;
|
||||
|
||||
AddItemCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
MxCommandReply reply = client.session(sessionId).addItemRaw(serverHandle, item);
|
||||
writeOutput("add-item", common, json, reply, () -> reply.getKind().name());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "advise", description = "Invokes MXAccess Advise.")
|
||||
static final class AdviseCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||
int serverHandle;
|
||||
|
||||
@Option(names = "--item-handle", required = true, description = "MXAccess item handle.")
|
||||
int itemHandle;
|
||||
|
||||
AdviseCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
MxCommandReply reply = client.session(sessionId).adviseRaw(serverHandle, itemHandle);
|
||||
writeOutput("advise", common, json, reply, () -> reply.getKind().name());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "write", description = "Invokes MXAccess Write.")
|
||||
static final class WriteCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
@Option(names = "--server-handle", required = true, description = "MXAccess server handle.")
|
||||
int serverHandle;
|
||||
|
||||
@Option(names = "--item-handle", required = true, description = "MXAccess item handle.")
|
||||
int itemHandle;
|
||||
|
||||
@Option(names = "--type", defaultValue = "string", description = "Value type.")
|
||||
String type;
|
||||
|
||||
@Option(names = "--value", required = true, description = "Value text.")
|
||||
String value;
|
||||
|
||||
@Option(names = "--user-id", defaultValue = "0", description = "MXAccess user id.")
|
||||
int userId;
|
||||
|
||||
WriteCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
MxCommandReply reply =
|
||||
client.session(sessionId).writeRaw(serverHandle, itemHandle, parseValue(type, value), userId);
|
||||
writeOutput("write", common, json, reply, () -> reply.getKind().name());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "stream-events", description = "Streams gateway events.")
|
||||
static final class StreamEventsCommand extends GatewayCommand {
|
||||
@Option(names = "--session-id", required = true, description = "Gateway session id.")
|
||||
String sessionId;
|
||||
|
||||
@Option(names = "--after-worker-sequence", defaultValue = "0", description = "Starting worker sequence.")
|
||||
long afterWorkerSequence;
|
||||
|
||||
@Option(names = "--limit", defaultValue = "0", description = "Maximum events to print.")
|
||||
int limit;
|
||||
|
||||
StreamEventsCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved());
|
||||
MxEventStream events = client.session(sessionId).streamEventsAfter(afterWorkerSequence)) {
|
||||
int count = 0;
|
||||
while (events.hasNext()) {
|
||||
MxEvent event = events.next();
|
||||
if (json) {
|
||||
client.out().println(protoJson(event));
|
||||
} else {
|
||||
client.out().printf("%d %s%n", event.getWorkerSequence(), event.getFamily());
|
||||
}
|
||||
count++;
|
||||
if (limit > 0 && count >= limit) {
|
||||
events.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
|
||||
static final class SmokeCommand extends GatewayCommand {
|
||||
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
|
||||
String clientName;
|
||||
|
||||
@Option(names = "--item", required = true, description = "Item definition.")
|
||||
String item;
|
||||
|
||||
SmokeCommand(MxGatewayCliClientFactory clientFactory) {
|
||||
super(clientFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||
var session = client.openSession(OpenSessionRequest.newBuilder()
|
||||
.setClientSessionName(clientName)
|
||||
.build());
|
||||
MxGatewayCliSession cliSession = client.session(session.getSessionId());
|
||||
int serverHandle = cliSession.register(clientName);
|
||||
int itemHandle = cliSession.addItem(serverHandle, item);
|
||||
cliSession.advise(serverHandle, itemHandle);
|
||||
if (json) {
|
||||
Map<String, Object> output = new LinkedHashMap<>();
|
||||
output.put("command", "smoke");
|
||||
output.put("options", common.redactedJsonMap());
|
||||
output.put("sessionId", session.getSessionId());
|
||||
output.put("serverHandle", serverHandle);
|
||||
output.put("itemHandle", itemHandle);
|
||||
client.out().println(jsonObject(output));
|
||||
} else {
|
||||
client.out().printf(
|
||||
"session=%s server=%d item=%d%n", session.getSessionId(), serverHandle, itemHandle);
|
||||
}
|
||||
client.closeSession(CloseSessionRequest.newBuilder()
|
||||
.setSessionId(session.getSessionId())
|
||||
.build());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static final class CommonOptions {
|
||||
@Spec
|
||||
CommandSpec spec;
|
||||
|
||||
@Option(names = "--endpoint", defaultValue = "localhost:5000", description = "Gateway endpoint.")
|
||||
String endpoint;
|
||||
|
||||
@Option(names = "--api-key", description = "Gateway API key.")
|
||||
String apiKey = "";
|
||||
|
||||
@Option(names = "--api-key-env", defaultValue = "MXGATEWAY_API_KEY", description = "API key environment variable.")
|
||||
String apiKeyEnv;
|
||||
|
||||
@Option(names = "--plaintext", description = "Use plaintext transport.")
|
||||
boolean plaintext;
|
||||
|
||||
@Option(names = "--ca-file", description = "CA certificate file.")
|
||||
Path caFile;
|
||||
|
||||
@Option(names = "--server-name-override", description = "TLS server name override.")
|
||||
String serverNameOverride = "";
|
||||
|
||||
@Option(names = "--timeout", defaultValue = "30s", description = "Per-call timeout.")
|
||||
String timeout;
|
||||
|
||||
private String resolvedApiKey = "";
|
||||
private Duration resolvedTimeout = Duration.ofSeconds(30);
|
||||
|
||||
CommonOptions resolved() {
|
||||
resolvedApiKey = apiKey == null || apiKey.isBlank() ? System.getenv(apiKeyEnv) : apiKey;
|
||||
if (resolvedApiKey == null) {
|
||||
resolvedApiKey = "";
|
||||
}
|
||||
resolvedTimeout = parseDuration(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
MxGatewayClientOptions toClientOptions() {
|
||||
return MxGatewayClientOptions.builder()
|
||||
.endpoint(endpoint)
|
||||
.apiKey(resolvedApiKey)
|
||||
.plaintext(plaintext)
|
||||
.caCertificatePath(caFile)
|
||||
.serverNameOverride(serverNameOverride)
|
||||
.callTimeout(resolvedTimeout)
|
||||
.build();
|
||||
}
|
||||
|
||||
Map<String, Object> redactedJsonMap() {
|
||||
Map<String, Object> values = new LinkedHashMap<>();
|
||||
values.put("endpoint", endpoint);
|
||||
values.put("apiKey", MxGatewaySecrets.redactApiKey(resolvedApiKey));
|
||||
values.put("apiKeyEnv", apiKeyEnv);
|
||||
values.put("plaintext", plaintext);
|
||||
values.put("caFile", caFile == null ? "" : caFile.toString());
|
||||
values.put("serverNameOverride", serverNameOverride);
|
||||
values.put("timeout", timeout);
|
||||
return values;
|
||||
}
|
||||
}
|
||||
|
||||
interface MxGatewayCliClientFactory {
|
||||
MxGatewayCliClient connect(CommonOptions options);
|
||||
}
|
||||
|
||||
interface MxGatewayCliClient extends AutoCloseable {
|
||||
PrintWriter out();
|
||||
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(OpenSessionRequest request);
|
||||
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(CloseSessionRequest request);
|
||||
|
||||
MxGatewayCliSession session(String sessionId);
|
||||
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
||||
interface MxGatewayCliSession {
|
||||
int register(String clientName);
|
||||
|
||||
MxCommandReply registerRaw(String clientName);
|
||||
|
||||
int addItem(int serverHandle, String itemDefinition);
|
||||
|
||||
MxCommandReply addItemRaw(int serverHandle, String itemDefinition);
|
||||
|
||||
void advise(int serverHandle, int itemHandle);
|
||||
|
||||
MxCommandReply adviseRaw(int serverHandle, int itemHandle);
|
||||
|
||||
MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId);
|
||||
|
||||
MxEventStream streamEventsAfter(long afterWorkerSequence);
|
||||
}
|
||||
|
||||
static final class GrpcMxGatewayCliClientFactory implements MxGatewayCliClientFactory {
|
||||
@Override
|
||||
public MxGatewayCliClient connect(CommonOptions options) {
|
||||
return new GrpcMxGatewayCliClient(MxGatewayClient.connect(options.toClientOptions()), options.spec.commandLine().getOut());
|
||||
}
|
||||
}
|
||||
|
||||
static final class GrpcMxGatewayCliClient implements MxGatewayCliClient {
|
||||
private final MxGatewayClient client;
|
||||
private final PrintWriter out;
|
||||
|
||||
GrpcMxGatewayCliClient(MxGatewayClient client, PrintWriter out) {
|
||||
this.client = client;
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrintWriter out() {
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(OpenSessionRequest request) {
|
||||
return client.openSessionRaw(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(CloseSessionRequest request) {
|
||||
return client.closeSessionRaw(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxGatewayCliSession session(String sessionId) {
|
||||
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
record GrpcMxGatewayCliSession(MxGatewaySession session) implements MxGatewayCliSession {
|
||||
@Override
|
||||
public int register(String clientName) {
|
||||
return session.register(clientName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply registerRaw(String clientName) {
|
||||
return session.registerRaw(clientName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int addItem(int serverHandle, String itemDefinition) {
|
||||
return session.addItem(serverHandle, itemDefinition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
|
||||
return session.addItemRaw(serverHandle, itemDefinition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void advise(int serverHandle, int itemHandle) {
|
||||
session.advise(serverHandle, itemHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
|
||||
return session.adviseRaw(serverHandle, itemHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
|
||||
return session.writeRaw(serverHandle, itemHandle, value, userId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
|
||||
return session.streamEventsAfter(afterWorkerSequence);
|
||||
}
|
||||
}
|
||||
|
||||
interface TextSupplier {
|
||||
String get();
|
||||
}
|
||||
|
||||
private static void writeOutput(
|
||||
String command, CommonOptions common, boolean json, Message reply, TextSupplier textSupplier) {
|
||||
PrintWriter out = common.spec.commandLine().getOut();
|
||||
if (json) {
|
||||
Map<String, Object> output = new LinkedHashMap<>();
|
||||
output.put("command", command);
|
||||
output.put("options", common.redactedJsonMap());
|
||||
output.put("reply", new RawJson(protoJson(reply)));
|
||||
out.println(jsonObject(output));
|
||||
return;
|
||||
}
|
||||
out.println(textSupplier.get());
|
||||
}
|
||||
|
||||
private static MxValue parseValue(String type, String text) {
|
||||
return switch (type) {
|
||||
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
|
||||
case "int32" -> MxValues.int32Value(Integer.parseInt(text));
|
||||
case "int64" -> MxValues.int64Value(Long.parseLong(text));
|
||||
case "float" -> MxValues.floatValue(Float.parseFloat(text));
|
||||
case "double" -> MxValues.doubleValue(Double.parseDouble(text));
|
||||
case "string" -> MxValues.stringValue(text);
|
||||
default -> throw new IllegalArgumentException("unsupported value type " + type);
|
||||
};
|
||||
}
|
||||
|
||||
private static Duration parseDuration(String value) {
|
||||
if (value == null || value.isBlank()) {
|
||||
return Duration.ofSeconds(30);
|
||||
}
|
||||
if (value.startsWith("P")) {
|
||||
return Duration.parse(value);
|
||||
}
|
||||
if (value.endsWith("ms")) {
|
||||
return Duration.ofMillis(Long.parseLong(value.substring(0, value.length() - 2)));
|
||||
}
|
||||
if (value.endsWith("s")) {
|
||||
return Duration.ofSeconds(Long.parseLong(value.substring(0, value.length() - 1)));
|
||||
}
|
||||
if (value.endsWith("m")) {
|
||||
return Duration.ofMinutes(Long.parseLong(value.substring(0, value.length() - 1)));
|
||||
}
|
||||
return Duration.parse(value);
|
||||
}
|
||||
|
||||
private static String protoJson(Message message) {
|
||||
try {
|
||||
return JsonFormat.printer().omittingInsignificantWhitespace().print(message);
|
||||
} catch (Exception error) {
|
||||
throw new IllegalStateException("failed to write protobuf JSON", error);
|
||||
}
|
||||
}
|
||||
|
||||
private static String jsonObject(Map<String, Object> values) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append('{');
|
||||
boolean first = true;
|
||||
for (Map.Entry<String, Object> entry : values.entrySet()) {
|
||||
if (!first) {
|
||||
builder.append(',');
|
||||
}
|
||||
first = false;
|
||||
builder.append(jsonString(entry.getKey())).append(':').append(jsonValue(entry.getValue()));
|
||||
}
|
||||
builder.append('}');
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static String jsonValue(Object value) {
|
||||
if (value == null) {
|
||||
return "null";
|
||||
}
|
||||
if (value instanceof RawJson rawJson) {
|
||||
return rawJson.value();
|
||||
}
|
||||
if (value instanceof String string) {
|
||||
return jsonString(string);
|
||||
}
|
||||
if (value instanceof Number || value instanceof Boolean) {
|
||||
return value.toString();
|
||||
}
|
||||
if (value instanceof Map<?, ?> map) {
|
||||
return jsonObject((Map<String, Object>) map);
|
||||
}
|
||||
return jsonString(value.toString());
|
||||
}
|
||||
|
||||
private static String jsonString(String value) {
|
||||
return '"'
|
||||
+ value.replace("\\", "\\\\")
|
||||
.replace("\"", "\\\"")
|
||||
.replace("\r", "\\r")
|
||||
.replace("\n", "\\n")
|
||||
+ '"';
|
||||
}
|
||||
|
||||
private record RawJson(String value) {
|
||||
}
|
||||
}
|
||||
+241
@@ -0,0 +1,241 @@
|
||||
package com.dohertylan.mxgateway.cli;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
final class MxGatewayCliTests {
|
||||
@Test
|
||||
void versionCommandPrintsProtocolVersions() {
|
||||
CliRun run = execute(new FakeClientFactory(), "version");
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals("", run.errors());
|
||||
assertTrue(run.output().contains("mxgateway-java 0.1.0"));
|
||||
assertTrue(run.output().contains("gatewayProtocolVersion=1"));
|
||||
assertTrue(run.output().contains("workerProtocolVersion=1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void versionCommandPrintsJson() {
|
||||
CliRun run = execute(new FakeClientFactory(), "version", "--json");
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertTrue(run.output().contains("\"clientVersion\":\"0.1.0\""));
|
||||
assertTrue(run.output().contains("\"gatewayProtocolVersion\":1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void openSessionJsonRedactsApiKey() {
|
||||
CliRun run = execute(
|
||||
new FakeClientFactory(),
|
||||
"open-session",
|
||||
"--endpoint",
|
||||
"localhost:5000",
|
||||
"--api-key",
|
||||
"mxgw_visible_secret",
|
||||
"--plaintext",
|
||||
"--client-session-name",
|
||||
"java-cli",
|
||||
"--json");
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertTrue(run.output().contains("\"command\":\"open-session\""));
|
||||
assertTrue(run.output().contains("\"sessionId\":\"session-cli\""));
|
||||
assertTrue(run.output().contains("mxgw***********cret"));
|
||||
assertFalse(run.output().contains("visible_secret"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeBuildsTypedValueFromParserOptions() {
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
CliRun run = execute(
|
||||
factory,
|
||||
"write",
|
||||
"--session-id",
|
||||
"session-cli",
|
||||
"--server-handle",
|
||||
"12",
|
||||
"--item-handle",
|
||||
"34",
|
||||
"--type",
|
||||
"int32",
|
||||
"--value",
|
||||
"123",
|
||||
"--json");
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertEquals(123, factory.client.session.lastWriteValue.getInt32Value());
|
||||
assertTrue(run.output().contains("\"kind\":\"MX_COMMAND_KIND_WRITE\""));
|
||||
}
|
||||
|
||||
@Test
|
||||
void smokeCommandRunsOpenRegisterAddAdviseAndClose() {
|
||||
FakeClientFactory factory = new FakeClientFactory();
|
||||
CliRun run = execute(factory, "smoke", "--item", "TestObject.TestInt", "--json");
|
||||
|
||||
assertEquals(0, run.exitCode());
|
||||
assertTrue(factory.client.session.registerCalled);
|
||||
assertTrue(factory.client.session.addItemCalled);
|
||||
assertTrue(factory.client.session.adviseCalled);
|
||||
assertTrue(factory.client.closeCalled);
|
||||
assertTrue(run.output().contains("\"serverHandle\":42"));
|
||||
assertTrue(run.output().contains("\"itemHandle\":7"));
|
||||
}
|
||||
|
||||
private static CliRun execute(MxGatewayCli.MxGatewayCliClientFactory factory, String... args) {
|
||||
StringWriter output = new StringWriter();
|
||||
StringWriter errors = new StringWriter();
|
||||
int exitCode = MxGatewayCli.execute(
|
||||
factory,
|
||||
new PrintWriter(output, true),
|
||||
new PrintWriter(errors, true),
|
||||
args);
|
||||
return new CliRun(exitCode, output.toString(), errors.toString());
|
||||
}
|
||||
|
||||
private record CliRun(int exitCode, String output, String errors) {
|
||||
}
|
||||
|
||||
private static final class FakeClientFactory implements MxGatewayCli.MxGatewayCliClientFactory {
|
||||
private FakeClient client;
|
||||
|
||||
@Override
|
||||
public MxGatewayCli.MxGatewayCliClient connect(MxGatewayCli.CommonOptions options) {
|
||||
client = new FakeClient(options.spec.commandLine().getOut());
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FakeClient implements MxGatewayCli.MxGatewayCliClient {
|
||||
private final PrintWriter out;
|
||||
private final FakeSession session = new FakeSession();
|
||||
private boolean closeCalled;
|
||||
|
||||
private FakeClient(PrintWriter out) {
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrintWriter out() {
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenSessionReply openSession(OpenSessionRequest request) {
|
||||
return OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-cli")
|
||||
.setProtocolStatus(ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseSessionReply closeSession(CloseSessionRequest request) {
|
||||
closeCalled = true;
|
||||
return CloseSessionReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setFinalState(SessionState.SESSION_STATE_CLOSED)
|
||||
.setProtocolStatus(ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxGatewayCli.MxGatewayCliSession session(String sessionId) {
|
||||
return session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FakeSession implements MxGatewayCli.MxGatewayCliSession {
|
||||
private boolean registerCalled;
|
||||
private boolean addItemCalled;
|
||||
private boolean adviseCalled;
|
||||
private MxValue lastWriteValue;
|
||||
|
||||
@Override
|
||||
public int register(String clientName) {
|
||||
registerCalled = true;
|
||||
return 42;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply registerRaw(String clientName) {
|
||||
registerCalled = true;
|
||||
return MxCommandReply.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_REGISTER)
|
||||
.setProtocolStatus(ok())
|
||||
.setRegister(RegisterReply.newBuilder().setServerHandle(42))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int addItem(int serverHandle, String itemDefinition) {
|
||||
addItemCalled = true;
|
||||
return 7;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
|
||||
addItemCalled = true;
|
||||
return MxCommandReply.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM)
|
||||
.setProtocolStatus(ok())
|
||||
.setAddItem(AddItemReply.newBuilder().setItemHandle(7))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void advise(int serverHandle, int itemHandle) {
|
||||
adviseCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
|
||||
adviseCalled = true;
|
||||
return MxCommandReply.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_ADVISE)
|
||||
.setProtocolStatus(ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
|
||||
lastWriteValue = value;
|
||||
return MxCommandReply.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
|
||||
.setProtocolStatus(ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public com.dohertylan.mxgateway.client.MxEventStream streamEventsAfter(long afterWorkerSequence) {
|
||||
throw new UnsupportedOperationException("stream-events is covered by client tests");
|
||||
}
|
||||
}
|
||||
|
||||
private static ProtocolStatus ok() {
|
||||
return ProtocolStatus.newBuilder()
|
||||
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
plugins {
|
||||
id 'java-library'
|
||||
id 'com.google.protobuf'
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api "com.google.protobuf:protobuf-java-util:${protobufVersion}"
|
||||
api "com.google.protobuf:protobuf-java:${protobufVersion}"
|
||||
api "io.grpc:grpc-protobuf:${grpcVersion}"
|
||||
api "io.grpc:grpc-stub:${grpcVersion}"
|
||||
|
||||
implementation "com.google.guava:guava:${guavaVersion}"
|
||||
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
|
||||
|
||||
compileOnly 'javax.annotation:javax.annotation-api:1.3.2'
|
||||
|
||||
testImplementation "com.google.code.gson:gson:${gsonVersion}"
|
||||
testImplementation "io.grpc:grpc-inprocess:${grpcVersion}"
|
||||
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main {
|
||||
proto {
|
||||
srcDir rootProject.file('../../src/MxGateway.Contracts/Protos')
|
||||
include 'mxaccess_gateway.proto'
|
||||
include 'mxaccess_worker.proto'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protobuf {
|
||||
protoc {
|
||||
artifact = "com.google.protobuf:protoc:${protobufVersion}"
|
||||
}
|
||||
|
||||
plugins {
|
||||
grpc {
|
||||
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
|
||||
}
|
||||
}
|
||||
|
||||
generatedFilesBaseDir = rootProject.file('src/main/generated').absolutePath
|
||||
|
||||
generateProtoTasks {
|
||||
all().configureEach {
|
||||
plugins {
|
||||
grpc {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
|
||||
public final class MxAccessException extends MxGatewayCommandException {
|
||||
public MxAccessException(String operation, ProtocolStatus protocolStatus, MxCommandReply reply) {
|
||||
super(operation, protocolStatus, reply);
|
||||
}
|
||||
|
||||
public MxAccessException(String operation, MxCommandReply reply) {
|
||||
super(operation, reply == null ? null : reply.getProtocolStatus(), reply);
|
||||
}
|
||||
}
|
||||
+117
@@ -0,0 +1,117 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
|
||||
public final class MxEventStream implements Iterator<MxEvent>, AutoCloseable {
|
||||
private static final Object END = new Object();
|
||||
|
||||
private final BlockingQueue<Object> queue;
|
||||
private volatile ClientCallStreamObserver<StreamEventsRequest> requestStream;
|
||||
private volatile boolean closed;
|
||||
private Object next;
|
||||
|
||||
MxEventStream(int capacity) {
|
||||
queue = new ArrayBlockingQueue<>(capacity);
|
||||
}
|
||||
|
||||
ClientResponseObserver<StreamEventsRequest, MxEvent> observer() {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> requestStream) {
|
||||
MxEventStream.this.requestStream = requestStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(MxEvent value) {
|
||||
offer(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
if (Status.fromThrowable(error).getCode() == Status.Code.CANCELLED && closed) {
|
||||
offer(END);
|
||||
return;
|
||||
}
|
||||
offer(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
offer(END);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (next == END) {
|
||||
return false;
|
||||
}
|
||||
if (next == null) {
|
||||
next = take();
|
||||
}
|
||||
if (next instanceof RuntimeException runtimeException) {
|
||||
next = END;
|
||||
throw runtimeException;
|
||||
}
|
||||
if (next instanceof Throwable throwable) {
|
||||
next = END;
|
||||
throw new MxGatewayException("gateway stream events failed: " + throwable.getMessage(), throwable);
|
||||
}
|
||||
return next != END;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MxEvent next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
Object value = next;
|
||||
next = null;
|
||||
return (MxEvent) value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closed = true;
|
||||
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream;
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled event stream", null);
|
||||
}
|
||||
offer(END);
|
||||
}
|
||||
|
||||
private Object take() {
|
||||
while (true) {
|
||||
try {
|
||||
return queue.take();
|
||||
} catch (InterruptedException error) {
|
||||
Thread.currentThread().interrupt();
|
||||
return new StatusRuntimeException(Status.CANCELLED.withDescription("interrupted while reading events"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void offer(Object value) {
|
||||
Objects.requireNonNull(value, "value");
|
||||
if (value == END) {
|
||||
queue.offer(value);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
queue.put(value);
|
||||
} catch (InterruptedException error) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
+37
@@ -0,0 +1,37 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.ForwardingClientCall;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
|
||||
public final class MxGatewayAuthInterceptor implements ClientInterceptor {
|
||||
static final Metadata.Key<String> AUTHORIZATION_HEADER =
|
||||
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
private final String apiKey;
|
||||
|
||||
public MxGatewayAuthInterceptor(String apiKey) {
|
||||
this.apiKey = apiKey == null ? "" : apiKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
|
||||
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
|
||||
if (apiKey.isBlank()) {
|
||||
return call;
|
||||
}
|
||||
|
||||
return new ForwardingClientCall.SimpleForwardingClientCall<>(call) {
|
||||
@Override
|
||||
public void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
headers.put(AUTHORIZATION_HEADER, "Bearer " + apiKey);
|
||||
super.start(responseListener, headers);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
public final class MxGatewayAuthenticationException extends MxGatewayException {
|
||||
public MxGatewayAuthenticationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
public final class MxGatewayAuthorizationException extends MxGatewayException {
|
||||
public MxGatewayAuthorizationException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
+228
@@ -0,0 +1,228 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.Duration;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientInterceptors;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.net.ssl.SSLException;
|
||||
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
|
||||
public final class MxGatewayClient implements AutoCloseable {
|
||||
private final ManagedChannel ownedChannel;
|
||||
private final MxGatewayClientOptions options;
|
||||
private final MxAccessGatewayGrpc.MxAccessGatewayBlockingStub blockingStub;
|
||||
private final MxAccessGatewayGrpc.MxAccessGatewayFutureStub futureStub;
|
||||
private final MxAccessGatewayGrpc.MxAccessGatewayStub asyncStub;
|
||||
|
||||
private MxGatewayClient(ManagedChannel channel, MxGatewayClientOptions options) {
|
||||
this.ownedChannel = channel;
|
||||
this.options = options;
|
||||
Channel intercepted = ClientInterceptors.intercept(channel, new MxGatewayAuthInterceptor(options.apiKey()));
|
||||
blockingStub = MxAccessGatewayGrpc.newBlockingStub(intercepted);
|
||||
futureStub = MxAccessGatewayGrpc.newFutureStub(intercepted);
|
||||
asyncStub = MxAccessGatewayGrpc.newStub(intercepted);
|
||||
}
|
||||
|
||||
public MxGatewayClient(Channel channel, MxGatewayClientOptions options) {
|
||||
this.ownedChannel = null;
|
||||
this.options = Objects.requireNonNull(options, "options");
|
||||
Channel intercepted = ClientInterceptors.intercept(channel, new MxGatewayAuthInterceptor(options.apiKey()));
|
||||
blockingStub = MxAccessGatewayGrpc.newBlockingStub(intercepted);
|
||||
futureStub = MxAccessGatewayGrpc.newFutureStub(intercepted);
|
||||
asyncStub = MxAccessGatewayGrpc.newStub(intercepted);
|
||||
}
|
||||
|
||||
public static MxGatewayClient connect(MxGatewayClientOptions options) {
|
||||
return new MxGatewayClient(createChannel(options), options);
|
||||
}
|
||||
|
||||
public MxAccessGatewayGrpc.MxAccessGatewayBlockingStub rawBlockingStub() {
|
||||
return withDeadline(blockingStub);
|
||||
}
|
||||
|
||||
public MxAccessGatewayGrpc.MxAccessGatewayFutureStub rawFutureStub() {
|
||||
return withDeadline(futureStub);
|
||||
}
|
||||
|
||||
public MxAccessGatewayGrpc.MxAccessGatewayStub rawAsyncStub() {
|
||||
return withDeadline(asyncStub);
|
||||
}
|
||||
|
||||
public MxGatewaySession openSession(OpenSessionRequest request) {
|
||||
OpenSessionReply reply = openSessionRaw(request);
|
||||
return new MxGatewaySession(this, reply);
|
||||
}
|
||||
|
||||
public MxGatewaySession openSession(String clientSessionName) {
|
||||
return openSession(OpenSessionRequest.newBuilder()
|
||||
.setClientSessionName(clientSessionName)
|
||||
.setCommandTimeout(Duration.newBuilder()
|
||||
.setSeconds(options.callTimeout().toSeconds())
|
||||
.setNanos(options.callTimeout().toNanosPart())
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
|
||||
public OpenSessionReply openSessionRaw(OpenSessionRequest request) {
|
||||
try {
|
||||
OpenSessionReply reply = rawBlockingStub().openSession(request);
|
||||
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
|
||||
return reply;
|
||||
} catch (RuntimeException error) {
|
||||
if (error instanceof MxGatewayException) {
|
||||
throw error;
|
||||
}
|
||||
throw MxGatewayErrors.fromGrpc("open session", error);
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<OpenSessionReply> openSessionAsync(OpenSessionRequest request) {
|
||||
CompletableFuture<OpenSessionReply> future = toCompletable(rawFutureStub().openSession(request));
|
||||
return future.thenApply(reply -> {
|
||||
MxGatewayErrors.ensureProtocolSuccess("open session", reply.getProtocolStatus(), null);
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
public MxCommandReply invoke(MxCommandRequest request) {
|
||||
try {
|
||||
MxCommandReply reply = rawBlockingStub().invoke(request);
|
||||
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
|
||||
MxGatewayErrors.ensureMxAccessSuccess("invoke", reply);
|
||||
return reply;
|
||||
} catch (RuntimeException error) {
|
||||
if (error instanceof MxGatewayException) {
|
||||
throw error;
|
||||
}
|
||||
throw MxGatewayErrors.fromGrpc("invoke", error);
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<MxCommandReply> invokeAsync(MxCommandRequest request) {
|
||||
CompletableFuture<MxCommandReply> future = toCompletable(rawFutureStub().invoke(request));
|
||||
return future.thenApply(reply -> {
|
||||
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
|
||||
MxGatewayErrors.ensureMxAccessSuccess("invoke", reply);
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
public CloseSessionReply closeSessionRaw(CloseSessionRequest request) {
|
||||
try {
|
||||
CloseSessionReply reply = rawBlockingStub().closeSession(request);
|
||||
MxGatewayErrors.ensureProtocolSuccess("close session", reply.getProtocolStatus(), null);
|
||||
return reply;
|
||||
} catch (RuntimeException error) {
|
||||
if (error instanceof MxGatewayException) {
|
||||
throw error;
|
||||
}
|
||||
throw MxGatewayErrors.fromGrpc("close session", error);
|
||||
}
|
||||
}
|
||||
|
||||
public MxEventStream streamEvents(StreamEventsRequest request) {
|
||||
MxEventStream stream = new MxEventStream(16);
|
||||
rawAsyncStub().streamEvents(request, stream.observer());
|
||||
return stream;
|
||||
}
|
||||
|
||||
public MxGatewayEventSubscription streamEventsAsync(
|
||||
StreamEventsRequest request, StreamObserver<MxEvent> observer) {
|
||||
MxGatewayEventSubscription subscription = new MxGatewayEventSubscription();
|
||||
rawAsyncStub().streamEvents(request, subscription.wrap(observer));
|
||||
return subscription;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (ownedChannel != null) {
|
||||
ownedChannel.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void closeAndAwaitTermination() throws InterruptedException {
|
||||
if (ownedChannel != null) {
|
||||
ownedChannel.shutdown();
|
||||
ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private static ManagedChannel createChannel(MxGatewayClientOptions options) {
|
||||
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(options.endpoint())
|
||||
.maxInboundMessageSize(16 * 1024 * 1024);
|
||||
if (!options.connectTimeout().isNegative()) {
|
||||
builder.withOption(
|
||||
io.grpc.netty.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS,
|
||||
Math.toIntExact(options.connectTimeout().toMillis()));
|
||||
}
|
||||
if (options.plaintext()) {
|
||||
builder.usePlaintext();
|
||||
} else if (options.caCertificatePath() != null) {
|
||||
try {
|
||||
builder.sslContext(GrpcSslContexts.forClient()
|
||||
.trustManager(options.caCertificatePath().toFile())
|
||||
.build());
|
||||
} catch (SSLException error) {
|
||||
throw new MxGatewayException("failed to configure gateway TLS", error);
|
||||
}
|
||||
} else {
|
||||
builder.useTransportSecurity();
|
||||
}
|
||||
if (!options.serverNameOverride().isBlank()) {
|
||||
builder.overrideAuthority(options.serverNameOverride());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private <T extends io.grpc.stub.AbstractStub<T>> T withDeadline(T stub) {
|
||||
if (options.callTimeout().isNegative()) {
|
||||
return stub;
|
||||
}
|
||||
return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private static <T> CompletableFuture<T> toCompletable(com.google.common.util.concurrent.ListenableFuture<T> source) {
|
||||
CompletableFuture<T> target = new CompletableFuture<>();
|
||||
Futures.addCallback(
|
||||
source,
|
||||
new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
target.complete(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable error) {
|
||||
if (error instanceof RuntimeException runtimeException) {
|
||||
target.completeExceptionally(MxGatewayErrors.fromGrpc("async call", runtimeException));
|
||||
return;
|
||||
}
|
||||
target.completeExceptionally(error);
|
||||
}
|
||||
},
|
||||
MoreExecutors.directExecutor());
|
||||
return target;
|
||||
}
|
||||
|
||||
static ProtocolStatusCode okStatusCode() {
|
||||
return ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK;
|
||||
}
|
||||
}
|
||||
+146
@@ -0,0 +1,146 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class MxGatewayClientOptions {
|
||||
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10);
|
||||
private static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofSeconds(30);
|
||||
|
||||
private final String endpoint;
|
||||
private final String apiKey;
|
||||
private final boolean plaintext;
|
||||
private final Path caCertificatePath;
|
||||
private final String serverNameOverride;
|
||||
private final Duration connectTimeout;
|
||||
private final Duration callTimeout;
|
||||
|
||||
private MxGatewayClientOptions(Builder builder) {
|
||||
endpoint = requireText(builder.endpoint, "endpoint");
|
||||
apiKey = builder.apiKey == null ? "" : builder.apiKey;
|
||||
plaintext = builder.plaintext;
|
||||
caCertificatePath = builder.caCertificatePath;
|
||||
serverNameOverride = builder.serverNameOverride == null ? "" : builder.serverNameOverride;
|
||||
connectTimeout = builder.connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : builder.connectTimeout;
|
||||
callTimeout = builder.callTimeout == null ? DEFAULT_CALL_TIMEOUT : builder.callTimeout;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public String endpoint() {
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public String apiKey() {
|
||||
return apiKey;
|
||||
}
|
||||
|
||||
public String redactedApiKey() {
|
||||
return MxGatewaySecrets.redactApiKey(apiKey);
|
||||
}
|
||||
|
||||
public boolean plaintext() {
|
||||
return plaintext;
|
||||
}
|
||||
|
||||
public Path caCertificatePath() {
|
||||
return caCertificatePath;
|
||||
}
|
||||
|
||||
public String serverNameOverride() {
|
||||
return serverNameOverride;
|
||||
}
|
||||
|
||||
public Duration connectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
||||
public Duration callTimeout() {
|
||||
return callTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MxGatewayClientOptions{"
|
||||
+ "endpoint='"
|
||||
+ endpoint
|
||||
+ '\''
|
||||
+ ", apiKey='"
|
||||
+ redactedApiKey()
|
||||
+ '\''
|
||||
+ ", plaintext="
|
||||
+ plaintext
|
||||
+ ", caCertificatePath="
|
||||
+ caCertificatePath
|
||||
+ ", serverNameOverride='"
|
||||
+ serverNameOverride
|
||||
+ '\''
|
||||
+ ", connectTimeout="
|
||||
+ connectTimeout
|
||||
+ ", callTimeout="
|
||||
+ callTimeout
|
||||
+ '}';
|
||||
}
|
||||
|
||||
private static String requireText(String value, String name) {
|
||||
if (value == null || value.isBlank()) {
|
||||
throw new IllegalArgumentException(name + " is required");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
private String endpoint;
|
||||
private String apiKey;
|
||||
private boolean plaintext;
|
||||
private Path caCertificatePath;
|
||||
private String serverNameOverride;
|
||||
private Duration connectTimeout;
|
||||
private Duration callTimeout;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
public Builder endpoint(String value) {
|
||||
endpoint = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder apiKey(String value) {
|
||||
apiKey = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder plaintext(boolean value) {
|
||||
plaintext = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder caCertificatePath(Path value) {
|
||||
caCertificatePath = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder serverNameOverride(String value) {
|
||||
serverNameOverride = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder connectTimeout(Duration value) {
|
||||
connectTimeout = Objects.requireNonNull(value, "connectTimeout");
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder callTimeout(Duration value) {
|
||||
callTimeout = Objects.requireNonNull(value, "callTimeout");
|
||||
return this;
|
||||
}
|
||||
|
||||
public MxGatewayClientOptions build() {
|
||||
return new MxGatewayClientOptions(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
public final class MxGatewayClientVersion {
|
||||
private static final int GATEWAY_PROTOCOL_VERSION = 1;
|
||||
private static final int WORKER_PROTOCOL_VERSION = 1;
|
||||
private static final String CLIENT_VERSION = "0.1.0";
|
||||
|
||||
private MxGatewayClientVersion() {
|
||||
}
|
||||
|
||||
public static String clientVersion() {
|
||||
return CLIENT_VERSION;
|
||||
}
|
||||
|
||||
public static int gatewayProtocolVersion() {
|
||||
return GATEWAY_PROTOCOL_VERSION;
|
||||
}
|
||||
|
||||
public static int workerProtocolVersion() {
|
||||
return WORKER_PROTOCOL_VERSION;
|
||||
}
|
||||
}
|
||||
+23
@@ -0,0 +1,23 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
|
||||
public class MxGatewayCommandException extends MxGatewayException {
|
||||
private final ProtocolStatus protocolStatus;
|
||||
private final MxCommandReply reply;
|
||||
|
||||
public MxGatewayCommandException(String operation, ProtocolStatus protocolStatus, MxCommandReply reply) {
|
||||
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
|
||||
this.protocolStatus = protocolStatus;
|
||||
this.reply = reply;
|
||||
}
|
||||
|
||||
public ProtocolStatus protocolStatus() {
|
||||
return protocolStatus;
|
||||
}
|
||||
|
||||
public MxCommandReply reply() {
|
||||
return reply;
|
||||
}
|
||||
}
|
||||
+72
@@ -0,0 +1,72 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
|
||||
final class MxGatewayErrors {
|
||||
private MxGatewayErrors() {
|
||||
}
|
||||
|
||||
static RuntimeException fromGrpc(String operation, RuntimeException error) {
|
||||
if (error instanceof StatusRuntimeException statusError) {
|
||||
Status status = statusError.getStatus();
|
||||
String message = MxGatewaySecrets.redactCredentials(status.getDescription());
|
||||
return switch (status.getCode()) {
|
||||
case UNAUTHENTICATED -> new MxGatewayAuthenticationException(
|
||||
"authentication failed: " + message, statusError);
|
||||
case PERMISSION_DENIED -> new MxGatewayAuthorizationException(
|
||||
"authorization failed: " + message, statusError);
|
||||
case DEADLINE_EXCEEDED -> new MxGatewayException("gateway call timed out: " + message, statusError);
|
||||
case CANCELLED -> new MxGatewayException("gateway call cancelled: " + message, statusError);
|
||||
default -> new MxGatewayException("gateway " + operation + " failed: " + message, statusError);
|
||||
};
|
||||
}
|
||||
|
||||
return new MxGatewayException("gateway " + operation + " failed: " + error.getMessage(), error);
|
||||
}
|
||||
|
||||
static void ensureProtocolSuccess(String operation, ProtocolStatus status, MxCommandReply reply) {
|
||||
if (status == null || status.getCode() == ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw switch (status.getCode()) {
|
||||
case PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND, PROTOCOL_STATUS_CODE_SESSION_NOT_READY ->
|
||||
new MxGatewaySessionException(operation, status);
|
||||
case PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE, PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION ->
|
||||
new MxGatewayWorkerException(operation, status);
|
||||
case PROTOCOL_STATUS_CODE_MXACCESS_FAILURE -> new MxAccessException(operation, status, reply);
|
||||
default -> new MxGatewayCommandException(operation, status, reply);
|
||||
};
|
||||
}
|
||||
|
||||
static void ensureMxAccessSuccess(String operation, MxCommandReply reply) {
|
||||
if (reply == null) {
|
||||
return;
|
||||
}
|
||||
if (reply.hasHresult() && reply.getHresult() != 0) {
|
||||
throw new MxAccessException(operation, reply);
|
||||
}
|
||||
for (var status : reply.getStatusesList()) {
|
||||
if (!MxStatuses.succeeded(status)) {
|
||||
throw new MxAccessException(operation, reply);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static String protocolStatusMessage(String operation, ProtocolStatus status) {
|
||||
if (status == null) {
|
||||
return "mxgateway " + operation + " failed with missing protocol status";
|
||||
}
|
||||
if (status.getMessage().isBlank()) {
|
||||
return "mxgateway " + operation + " failed with protocol status " + status.getCode();
|
||||
}
|
||||
return "mxgateway " + operation + " failed with protocol status "
|
||||
+ status.getCode()
|
||||
+ ": "
|
||||
+ status.getMessage();
|
||||
}
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.ClientResponseObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
|
||||
public final class MxGatewayEventSubscription implements AutoCloseable {
|
||||
private final AtomicReference<ClientCallStreamObserver<StreamEventsRequest>> requestStream = new AtomicReference<>();
|
||||
|
||||
ClientResponseObserver<StreamEventsRequest, MxEvent> wrap(StreamObserver<MxEvent> observer) {
|
||||
return new ClientResponseObserver<>() {
|
||||
@Override
|
||||
public void beforeStart(ClientCallStreamObserver<StreamEventsRequest> stream) {
|
||||
requestStream.set(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(MxEvent value) {
|
||||
observer.onNext(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
observer.onError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
observer.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
ClientCallStreamObserver<StreamEventsRequest> stream = requestStream.get();
|
||||
if (stream != null) {
|
||||
stream.cancel("client cancelled event stream", null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
public class MxGatewayException extends RuntimeException {
|
||||
public MxGatewayException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public MxGatewayException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
public final class MxGatewaySecrets {
|
||||
private MxGatewaySecrets() {
|
||||
}
|
||||
|
||||
public static String redactApiKey(String apiKey) {
|
||||
if (apiKey == null || apiKey.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
if (apiKey.length() <= 8) {
|
||||
return "<redacted>";
|
||||
}
|
||||
|
||||
return apiKey.substring(0, 4)
|
||||
+ "*".repeat(apiKey.length() - 8)
|
||||
+ apiKey.substring(apiKey.length() - 4);
|
||||
}
|
||||
|
||||
public static String redactCredentials(String value) {
|
||||
if (value == null || value.isBlank()) {
|
||||
return value == null ? "" : value;
|
||||
}
|
||||
|
||||
String[] parts = value.split("\\s+");
|
||||
for (int index = 0; index < parts.length; index++) {
|
||||
if (parts[index].startsWith("mxgw_") || parts[index].equalsIgnoreCase("bearer")) {
|
||||
parts[index] = "<redacted>";
|
||||
}
|
||||
}
|
||||
return String.join(" ", parts);
|
||||
}
|
||||
}
|
||||
+184
@@ -0,0 +1,184 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.HexFormat;
|
||||
import java.util.Objects;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AddItem2Command;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AddItemCommand;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AdviseCommand;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommand;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.RegisterCommand;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.UnregisterCommand;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.Write2Command;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.WriteCommand;
|
||||
|
||||
public final class MxGatewaySession implements AutoCloseable {
|
||||
private static final SecureRandom RANDOM = new SecureRandom();
|
||||
|
||||
private final MxGatewayClient client;
|
||||
private final OpenSessionReply openReply;
|
||||
private CloseSessionReply closeReply;
|
||||
|
||||
MxGatewaySession(MxGatewayClient client, OpenSessionReply openReply) {
|
||||
this.client = Objects.requireNonNull(client, "client");
|
||||
this.openReply = Objects.requireNonNull(openReply, "openReply");
|
||||
}
|
||||
|
||||
public static MxGatewaySession forSessionId(MxGatewayClient client, String sessionId) {
|
||||
return new MxGatewaySession(
|
||||
client, OpenSessionReply.newBuilder().setSessionId(sessionId).build());
|
||||
}
|
||||
|
||||
public String sessionId() {
|
||||
return openReply.getSessionId();
|
||||
}
|
||||
|
||||
public OpenSessionReply openReply() {
|
||||
return openReply;
|
||||
}
|
||||
|
||||
public synchronized CloseSessionReply closeRaw() {
|
||||
if (closeReply == null) {
|
||||
closeReply = client.closeSessionRaw(CloseSessionRequest.newBuilder()
|
||||
.setSessionId(sessionId())
|
||||
.setClientCorrelationId(newCorrelationId())
|
||||
.build());
|
||||
}
|
||||
return closeReply;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closeRaw();
|
||||
}
|
||||
|
||||
public int register(String clientName) {
|
||||
MxCommandReply reply = registerRaw(clientName);
|
||||
if (reply.hasRegister()) {
|
||||
return reply.getRegister().getServerHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
|
||||
public MxCommandReply registerRaw(String clientName) {
|
||||
return invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_REGISTER)
|
||||
.setRegister(RegisterCommand.newBuilder().setClientName(clientName))
|
||||
.build());
|
||||
}
|
||||
|
||||
public void unregister(int serverHandle) {
|
||||
invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_UNREGISTER)
|
||||
.setUnregister(UnregisterCommand.newBuilder().setServerHandle(serverHandle))
|
||||
.build());
|
||||
}
|
||||
|
||||
public int addItem(int serverHandle, String itemDefinition) {
|
||||
MxCommandReply reply = addItemRaw(serverHandle, itemDefinition);
|
||||
if (reply.hasAddItem()) {
|
||||
return reply.getAddItem().getItemHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
|
||||
public MxCommandReply addItemRaw(int serverHandle, String itemDefinition) {
|
||||
return invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM)
|
||||
.setAddItem(AddItemCommand.newBuilder()
|
||||
.setServerHandle(serverHandle)
|
||||
.setItemDefinition(itemDefinition))
|
||||
.build());
|
||||
}
|
||||
|
||||
public int addItem2(int serverHandle, String itemDefinition, String itemContext) {
|
||||
MxCommandReply reply = addItem2Raw(serverHandle, itemDefinition, itemContext);
|
||||
if (reply.hasAddItem2()) {
|
||||
return reply.getAddItem2().getItemHandle();
|
||||
}
|
||||
return reply.getReturnValue().getInt32Value();
|
||||
}
|
||||
|
||||
public MxCommandReply addItem2Raw(int serverHandle, String itemDefinition, String itemContext) {
|
||||
return invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_ADD_ITEM2)
|
||||
.setAddItem2(AddItem2Command.newBuilder()
|
||||
.setServerHandle(serverHandle)
|
||||
.setItemDefinition(itemDefinition)
|
||||
.setItemContext(itemContext))
|
||||
.build());
|
||||
}
|
||||
|
||||
public void advise(int serverHandle, int itemHandle) {
|
||||
adviseRaw(serverHandle, itemHandle);
|
||||
}
|
||||
|
||||
public MxCommandReply adviseRaw(int serverHandle, int itemHandle) {
|
||||
return invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_ADVISE)
|
||||
.setAdvise(AdviseCommand.newBuilder()
|
||||
.setServerHandle(serverHandle)
|
||||
.setItemHandle(itemHandle))
|
||||
.build());
|
||||
}
|
||||
|
||||
public void write(int serverHandle, int itemHandle, MxValue value, int userId) {
|
||||
writeRaw(serverHandle, itemHandle, value, userId);
|
||||
}
|
||||
|
||||
public MxCommandReply writeRaw(int serverHandle, int itemHandle, MxValue value, int userId) {
|
||||
return invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
|
||||
.setWrite(WriteCommand.newBuilder()
|
||||
.setServerHandle(serverHandle)
|
||||
.setItemHandle(itemHandle)
|
||||
.setValue(value)
|
||||
.setUserId(userId))
|
||||
.build());
|
||||
}
|
||||
|
||||
public void write2(int serverHandle, int itemHandle, MxValue value, MxValue timestampValue, int userId) {
|
||||
invokeCommand(MxCommand.newBuilder()
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE2)
|
||||
.setWrite2(Write2Command.newBuilder()
|
||||
.setServerHandle(serverHandle)
|
||||
.setItemHandle(itemHandle)
|
||||
.setValue(value)
|
||||
.setTimestampValue(timestampValue)
|
||||
.setUserId(userId))
|
||||
.build());
|
||||
}
|
||||
|
||||
public MxEventStream streamEvents() {
|
||||
return streamEventsAfter(0);
|
||||
}
|
||||
|
||||
public MxEventStream streamEventsAfter(long afterWorkerSequence) {
|
||||
return client.streamEvents(StreamEventsRequest.newBuilder()
|
||||
.setSessionId(sessionId())
|
||||
.setAfterWorkerSequence(afterWorkerSequence)
|
||||
.build());
|
||||
}
|
||||
|
||||
public MxCommandReply invokeCommand(MxCommand command) {
|
||||
return client.invoke(MxCommandRequest.newBuilder()
|
||||
.setSessionId(sessionId())
|
||||
.setClientCorrelationId(newCorrelationId())
|
||||
.setCommand(command)
|
||||
.build());
|
||||
}
|
||||
|
||||
private static String newCorrelationId() {
|
||||
byte[] bytes = new byte[16];
|
||||
RANDOM.nextBytes(bytes);
|
||||
return HexFormat.of().formatHex(bytes);
|
||||
}
|
||||
}
|
||||
+16
@@ -0,0 +1,16 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
|
||||
public final class MxGatewaySessionException extends MxGatewayException {
|
||||
private final ProtocolStatus protocolStatus;
|
||||
|
||||
public MxGatewaySessionException(String operation, ProtocolStatus protocolStatus) {
|
||||
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
|
||||
this.protocolStatus = protocolStatus;
|
||||
}
|
||||
|
||||
public ProtocolStatus protocolStatus() {
|
||||
return protocolStatus;
|
||||
}
|
||||
}
|
||||
+16
@@ -0,0 +1,16 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
|
||||
public final class MxGatewayWorkerException extends MxGatewayException {
|
||||
private final ProtocolStatus protocolStatus;
|
||||
|
||||
public MxGatewayWorkerException(String operation, ProtocolStatus protocolStatus) {
|
||||
super(MxGatewayErrors.protocolStatusMessage(operation, protocolStatus));
|
||||
this.protocolStatus = protocolStatus;
|
||||
}
|
||||
|
||||
public ProtocolStatus protocolStatus() {
|
||||
return protocolStatus;
|
||||
}
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxStatusCategory;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxStatusProxy;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxStatusSource;
|
||||
|
||||
public final class MxStatuses {
|
||||
private MxStatuses() {
|
||||
}
|
||||
|
||||
public static boolean succeeded(MxStatusProxy status) {
|
||||
return status == null || status.getSuccess() != 0;
|
||||
}
|
||||
|
||||
public static MxStatusView view(MxStatusProxy status) {
|
||||
return new MxStatusView(status);
|
||||
}
|
||||
|
||||
public record MxStatusView(MxStatusProxy raw) {
|
||||
public int success() {
|
||||
return raw.getSuccess();
|
||||
}
|
||||
|
||||
public MxStatusCategory category() {
|
||||
return raw.getCategory();
|
||||
}
|
||||
|
||||
public MxStatusSource detectedBy() {
|
||||
return raw.getDetectedBy();
|
||||
}
|
||||
|
||||
public int detail() {
|
||||
return raw.getDetail();
|
||||
}
|
||||
|
||||
public int rawCategory() {
|
||||
return raw.getRawCategory();
|
||||
}
|
||||
|
||||
public int rawDetectedBy() {
|
||||
return raw.getRawDetectedBy();
|
||||
}
|
||||
|
||||
public String diagnosticText() {
|
||||
return raw.getDiagnosticText();
|
||||
}
|
||||
}
|
||||
}
|
||||
+170
@@ -0,0 +1,170 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Timestamp;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.BoolArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.DoubleArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.FloatArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.Int32Array;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.Int64Array;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxDataType;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.RawArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StringArray;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.TimestampArray;
|
||||
|
||||
public final class MxValues {
|
||||
private MxValues() {
|
||||
}
|
||||
|
||||
public static MxValue boolValue(boolean value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_BOOLEAN)
|
||||
.setVariantType("VT_BOOL")
|
||||
.setBoolValue(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue int32Value(int value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_INTEGER)
|
||||
.setVariantType("VT_I4")
|
||||
.setInt32Value(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue int64Value(long value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_INTEGER)
|
||||
.setVariantType("VT_I8")
|
||||
.setInt64Value(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue floatValue(float value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_FLOAT)
|
||||
.setVariantType("VT_R4")
|
||||
.setFloatValue(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue doubleValue(double value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_DOUBLE)
|
||||
.setVariantType("VT_R8")
|
||||
.setDoubleValue(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue stringValue(String value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_STRING)
|
||||
.setVariantType("VT_BSTR")
|
||||
.setStringValue(value)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxValue timestampValue(Instant value) {
|
||||
return MxValue.newBuilder()
|
||||
.setDataType(MxDataType.MX_DATA_TYPE_TIME)
|
||||
.setVariantType("VT_DATE")
|
||||
.setTimestampValue(Timestamp.newBuilder()
|
||||
.setSeconds(value.getEpochSecond())
|
||||
.setNanos(value.getNano())
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static Object nativeValue(MxValue value) {
|
||||
if (value == null || value.getIsNull()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return switch (value.getKindCase()) {
|
||||
case BOOL_VALUE -> value.getBoolValue();
|
||||
case INT32_VALUE -> value.getInt32Value();
|
||||
case INT64_VALUE -> value.getInt64Value();
|
||||
case FLOAT_VALUE -> value.getFloatValue();
|
||||
case DOUBLE_VALUE -> value.getDoubleValue();
|
||||
case STRING_VALUE -> value.getStringValue();
|
||||
case TIMESTAMP_VALUE -> instant(value.getTimestampValue());
|
||||
case ARRAY_VALUE -> nativeArray(value.getArrayValue());
|
||||
case RAW_VALUE -> value.getRawValue().toByteArray();
|
||||
case KIND_NOT_SET -> null;
|
||||
};
|
||||
}
|
||||
|
||||
public static Object nativeArray(MxArray array) {
|
||||
if (array == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return switch (array.getValuesCase()) {
|
||||
case BOOL_VALUES -> List.copyOf(array.getBoolValues().getValuesList());
|
||||
case INT32_VALUES -> List.copyOf(array.getInt32Values().getValuesList());
|
||||
case INT64_VALUES -> List.copyOf(array.getInt64Values().getValuesList());
|
||||
case FLOAT_VALUES -> List.copyOf(array.getFloatValues().getValuesList());
|
||||
case DOUBLE_VALUES -> List.copyOf(array.getDoubleValues().getValuesList());
|
||||
case STRING_VALUES -> List.copyOf(array.getStringValues().getValuesList());
|
||||
case TIMESTAMP_VALUES -> timestampValues(array.getTimestampValues());
|
||||
case RAW_VALUES -> rawValues(array.getRawValues());
|
||||
case VALUES_NOT_SET -> List.of();
|
||||
};
|
||||
}
|
||||
|
||||
public static MxArray stringArray(List<String> values) {
|
||||
return MxArray.newBuilder()
|
||||
.setElementDataType(MxDataType.MX_DATA_TYPE_STRING)
|
||||
.setVariantType("VT_ARRAY|VT_BSTR")
|
||||
.addDimensions(values.size())
|
||||
.setStringValues(StringArray.newBuilder().addAllValues(values))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static MxArray int32Array(List<Integer> values) {
|
||||
return MxArray.newBuilder()
|
||||
.setElementDataType(MxDataType.MX_DATA_TYPE_INTEGER)
|
||||
.setVariantType("VT_ARRAY|VT_I4")
|
||||
.addDimensions(values.size())
|
||||
.setInt32Values(Int32Array.newBuilder().addAllValues(values))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static String kindName(MxValue value) {
|
||||
return value == null ? "KIND_NOT_SET" : value.getKindCase().name();
|
||||
}
|
||||
|
||||
private static Instant instant(Timestamp timestamp) {
|
||||
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
|
||||
}
|
||||
|
||||
private static List<Instant> timestampValues(TimestampArray array) {
|
||||
List<Instant> values = new ArrayList<>();
|
||||
for (Timestamp timestamp : array.getValuesList()) {
|
||||
values.add(instant(timestamp));
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
private static List<byte[]> rawValues(RawArray array) {
|
||||
List<byte[]> values = new ArrayList<>();
|
||||
for (ByteString rawValue : array.getValuesList()) {
|
||||
values.add(rawValue.toByteArray());
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private static void generatedTypeReferences(
|
||||
BoolArray boolArray,
|
||||
Int64Array int64Array,
|
||||
FloatArray floatArray,
|
||||
DoubleArray doubleArray) {
|
||||
// Keeps generated repeated-value imports visible for javadocs and IDE navigation.
|
||||
}
|
||||
}
|
||||
+29
@@ -0,0 +1,29 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import mxaccess_worker.v1.MxaccessWorker.WorkerEnvelope;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
final class GeneratedContractSmokeTests {
|
||||
@Test
|
||||
void generatedGatewayAndWorkerContractsCompile() {
|
||||
OpenSessionRequest request = OpenSessionRequest.newBuilder()
|
||||
.setClientSessionName("junit")
|
||||
.build();
|
||||
WorkerEnvelope envelope = WorkerEnvelope.newBuilder()
|
||||
.setProtocolVersion(MxGatewayClientVersion.workerProtocolVersion())
|
||||
.build();
|
||||
|
||||
assertEquals("junit", request.getClientSessionName());
|
||||
assertEquals("mxaccess_gateway.v1.MxAccessGateway", MxAccessGatewayGrpc.SERVICE_NAME);
|
||||
assertEquals(1, envelope.getProtocolVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
void javaTwentyOneToolchainRunsTests() {
|
||||
assertEquals(21, Runtime.version().feature());
|
||||
}
|
||||
}
|
||||
+243
@@ -0,0 +1,243 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import io.grpc.Context;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.stub.ServerCallStreamObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.time.Duration;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
final class MxGatewayClientSessionTests {
|
||||
@Test
|
||||
void unaryCallsCarryAuthMetadataAndDeadline() throws Exception {
|
||||
AtomicReference<String> authorization = new AtomicReference<>();
|
||||
AtomicReference<MxCommandRequest> commandRequest = new AtomicReference<>();
|
||||
AtomicReference<Boolean> deadlineSeen = new AtomicReference<>(false);
|
||||
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
@Override
|
||||
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
|
||||
deadlineSeen.set(Context.current().getDeadline() != null);
|
||||
responseObserver.onNext(OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-java")
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
commandRequest.set(request);
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(request.getCommand().getKind())
|
||||
.setProtocolStatus(ok())
|
||||
.setRegister(RegisterReply.newBuilder().setServerHandle(42))
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (InProcessGateway gateway = InProcessGateway.start(service, authorization);
|
||||
MxGatewayClient client = gateway.client("mxgw_visible_secret", Duration.ofSeconds(5))) {
|
||||
MxGatewaySession session = client.openSession("junit-session");
|
||||
|
||||
int serverHandle = session.register("java-test-client");
|
||||
|
||||
assertEquals(42, serverHandle);
|
||||
assertEquals("Bearer mxgw_visible_secret", authorization.get());
|
||||
assertEquals("session-java", commandRequest.get().getSessionId());
|
||||
assertEquals(MxCommandKind.MX_COMMAND_KIND_REGISTER, commandRequest.get().getCommand().getKind());
|
||||
assertTrue(deadlineSeen.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void methodHelpersReturnTypedHandlesAndRawReplies() throws Exception {
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
MxCommandReply.Builder reply = MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(request.getCommand().getKind())
|
||||
.setProtocolStatus(ok());
|
||||
if (request.getCommand().getKind() == MxCommandKind.MX_COMMAND_KIND_ADD_ITEM) {
|
||||
reply.setAddItem(AddItemReply.newBuilder().setItemHandle(7));
|
||||
}
|
||||
responseObserver.onNext(reply.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
|
||||
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(client, "existing-session");
|
||||
|
||||
int itemHandle = session.addItem(12, "TestObject.TestInt");
|
||||
MxCommandReply raw = session.adviseRaw(12, itemHandle);
|
||||
|
||||
assertEquals(7, itemHandle);
|
||||
assertEquals(MxCommandKind.MX_COMMAND_KIND_ADVISE, raw.getKind());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void streamCancellationCancelsServerCall() throws Exception {
|
||||
CountDownLatch cancelled = new CountDownLatch(1);
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
@Override
|
||||
public void streamEvents(StreamEventsRequest request, StreamObserver<MxEvent> responseObserver) {
|
||||
ServerCallStreamObserver<MxEvent> serverObserver =
|
||||
(ServerCallStreamObserver<MxEvent>) responseObserver;
|
||||
serverObserver.setOnCancelHandler(cancelled::countDown);
|
||||
responseObserver.onNext(MxEvent.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setWorkerSequence(1)
|
||||
.build());
|
||||
}
|
||||
};
|
||||
|
||||
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
|
||||
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
|
||||
MxEventStream events = MxGatewaySession.forSessionId(client, "stream-session").streamEvents();
|
||||
|
||||
assertTrue(events.hasNext());
|
||||
assertEquals(1, events.next().getWorkerSequence());
|
||||
events.close();
|
||||
|
||||
assertTrue(cancelled.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void commandFailureKeepsRawReply() throws Exception {
|
||||
TestGatewayService service = new TestGatewayService() {
|
||||
@Override
|
||||
public void invoke(MxCommandRequest request, StreamObserver<MxCommandReply> responseObserver) {
|
||||
responseObserver.onNext(MxCommandReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setKind(MxCommandKind.MX_COMMAND_KIND_WRITE)
|
||||
.setProtocolStatus(ProtocolStatus.newBuilder()
|
||||
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE)
|
||||
.setMessage("MXAccess rejected the write."))
|
||||
.setHresult(-2147220992)
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
try (InProcessGateway gateway = InProcessGateway.start(service, new AtomicReference<>());
|
||||
MxGatewayClient client = gateway.client("", Duration.ofSeconds(5))) {
|
||||
MxGatewaySession session = MxGatewaySession.forSessionId(client, "failure-session");
|
||||
|
||||
MxAccessException error = assertThrows(
|
||||
MxAccessException.class,
|
||||
() -> session.write(1, 2, MxValues.int32Value(123), 0));
|
||||
|
||||
assertNotNull(error.reply());
|
||||
assertEquals(-2147220992, error.reply().getHresult());
|
||||
}
|
||||
}
|
||||
|
||||
private static ProtocolStatus ok() {
|
||||
return ProtocolStatus.newBuilder()
|
||||
.setCode(ProtocolStatusCode.PROTOCOL_STATUS_CODE_OK)
|
||||
.build();
|
||||
}
|
||||
|
||||
private abstract static class TestGatewayService extends MxAccessGatewayGrpc.MxAccessGatewayImplBase {
|
||||
@Override
|
||||
public void openSession(OpenSessionRequest request, StreamObserver<OpenSessionReply> responseObserver) {
|
||||
responseObserver.onNext(OpenSessionReply.newBuilder()
|
||||
.setSessionId("session-java")
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeSession(CloseSessionRequest request, StreamObserver<CloseSessionReply> responseObserver) {
|
||||
responseObserver.onNext(CloseSessionReply.newBuilder()
|
||||
.setSessionId(request.getSessionId())
|
||||
.setFinalState(SessionState.SESSION_STATE_CLOSED)
|
||||
.setProtocolStatus(ok())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
private record InProcessGateway(Server server, ManagedChannel channel) implements AutoCloseable {
|
||||
static InProcessGateway start(
|
||||
MxAccessGatewayGrpc.MxAccessGatewayImplBase service, AtomicReference<String> authorization)
|
||||
throws Exception {
|
||||
String serverName = "mxgw-java-" + UUID.randomUUID();
|
||||
ServerInterceptor interceptor = new ServerInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
ServerCall<ReqT, RespT> call,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
authorization.set(headers.get(MxGatewayAuthInterceptor.AUTHORIZATION_HEADER));
|
||||
return next.startCall(call, headers);
|
||||
}
|
||||
};
|
||||
Server server = InProcessServerBuilder.forName(serverName)
|
||||
.directExecutor()
|
||||
.addService(io.grpc.ServerInterceptors.intercept(service, interceptor))
|
||||
.build()
|
||||
.start();
|
||||
ManagedChannel channel = InProcessChannelBuilder.forName(serverName)
|
||||
.directExecutor()
|
||||
.build();
|
||||
return new InProcessGateway(server, channel);
|
||||
}
|
||||
|
||||
MxGatewayClient client(String apiKey, Duration callTimeout) {
|
||||
return new MxGatewayClient(
|
||||
channel,
|
||||
MxGatewayClientOptions.builder()
|
||||
.endpoint("in-process")
|
||||
.apiKey(apiKey)
|
||||
.plaintext(true)
|
||||
.callTimeout(callTimeout)
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
channel.shutdownNow();
|
||||
server.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
+136
@@ -0,0 +1,136 @@
|
||||
package com.dohertylan.mxgateway.client;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxStatusCategory;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxStatusProxy;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
final class MxGatewayFixtureTests {
|
||||
@Test
|
||||
void valueFixtureCasesExposeNativeProjectionAndRawMetadata() throws Exception {
|
||||
JsonArray cases = readFixture("values/value-conversion-cases.json").getAsJsonArray("cases");
|
||||
|
||||
for (var element : cases) {
|
||||
JsonObject testCase = element.getAsJsonObject();
|
||||
MxValue.Builder builder = MxValue.newBuilder();
|
||||
JsonFormat.parser().merge(testCase.getAsJsonObject("value").toString(), builder);
|
||||
MxValue value = builder.build();
|
||||
|
||||
assertEquals(testCase.get("expectedKind").getAsString(), lowerCamelKind(value));
|
||||
if ("timestamp.utc".equals(testCase.get("id").getAsString())) {
|
||||
assertEquals(Instant.parse("2026-01-01T00:00:04Z"), MxValues.nativeValue(value));
|
||||
}
|
||||
if ("string-array".equals(testCase.get("id").getAsString())) {
|
||||
assertEquals(List.of("alpha", "beta"), MxValues.nativeValue(value));
|
||||
}
|
||||
if ("raw-fallback.variant".equals(testCase.get("id").getAsString())) {
|
||||
assertEquals("No lossless typed projection exists for this VARIANT.", value.getRawDiagnostic());
|
||||
assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, (byte[]) MxValues.nativeValue(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void statusFixtureCasesPreserveRawFields() throws Exception {
|
||||
JsonArray cases = readFixture("statuses/status-conversion-cases.json").getAsJsonArray("cases");
|
||||
|
||||
for (var element : cases) {
|
||||
JsonObject testCase = element.getAsJsonObject();
|
||||
MxStatusProxy.Builder builder = MxStatusProxy.newBuilder();
|
||||
JsonFormat.parser().merge(testCase.getAsJsonObject("status").toString(), builder);
|
||||
MxStatusProxy status = builder.build();
|
||||
|
||||
assertEquals(
|
||||
testCase.getAsJsonObject("status").get("rawCategory").getAsInt(),
|
||||
status.getRawCategory());
|
||||
if ("ok.responding-lmx".equals(testCase.get("id").getAsString())) {
|
||||
assertTrue(MxStatuses.succeeded(status));
|
||||
}
|
||||
if ("security-error.requesting-lmx".equals(testCase.get("id").getAsString())) {
|
||||
assertFalse(MxStatuses.succeeded(status));
|
||||
assertEquals(MxStatusCategory.MX_STATUS_CATEGORY_SECURITY_ERROR, MxStatuses.view(status).category());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void mxAccessFailureFixtureMapsToRichCommandException() throws Exception {
|
||||
MxCommandReply.Builder builder = MxCommandReply.newBuilder();
|
||||
JsonFormat.parser().merge(
|
||||
Files.readString(fixtureRoot().resolve("command-replies/write.mxaccess-failure.reply.json")),
|
||||
builder);
|
||||
MxCommandReply reply = builder.build();
|
||||
|
||||
try {
|
||||
MxGatewayErrors.ensureProtocolSuccess("invoke", reply.getProtocolStatus(), reply);
|
||||
} catch (MxAccessException error) {
|
||||
assertEquals(ProtocolStatusCode.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE, error.protocolStatus().getCode());
|
||||
assertEquals(-2147220992, error.reply().getHresult());
|
||||
assertEquals(2, error.reply().getStatusesCount());
|
||||
return;
|
||||
}
|
||||
|
||||
throw new AssertionError("expected MxAccessException");
|
||||
}
|
||||
|
||||
@Test
|
||||
void grpcAuthErrorsAreClassifiedAndRedacted() {
|
||||
RuntimeException authError = MxGatewayErrors.fromGrpc(
|
||||
"open session",
|
||||
new io.grpc.StatusRuntimeException(io.grpc.Status.UNAUTHENTICATED.withDescription(
|
||||
"invalid API key mxgw_visible_secret")));
|
||||
RuntimeException permissionError = MxGatewayErrors.fromGrpc(
|
||||
"write",
|
||||
new io.grpc.StatusRuntimeException(io.grpc.Status.PERMISSION_DENIED.withDescription(
|
||||
"missing scope mxaccess.write")));
|
||||
|
||||
assertInstanceOf(MxGatewayAuthenticationException.class, authError);
|
||||
assertInstanceOf(MxGatewayAuthorizationException.class, permissionError);
|
||||
assertTrue(authError.getMessage().contains("<redacted>"));
|
||||
assertFalse(authError.getMessage().contains("visible_secret"));
|
||||
}
|
||||
|
||||
private static JsonObject readFixture(String relativePath) throws Exception {
|
||||
return JsonParser.parseString(Files.readString(fixtureRoot().resolve(relativePath))).getAsJsonObject();
|
||||
}
|
||||
|
||||
private static Path fixtureRoot() {
|
||||
Path current = Path.of(System.getProperty("user.dir")).toAbsolutePath();
|
||||
for (Path path = current; path != null; path = path.getParent()) {
|
||||
Path candidate = path.resolve("clients/proto/fixtures/behavior");
|
||||
if (Files.exists(candidate)) {
|
||||
return candidate;
|
||||
}
|
||||
candidate = path.resolve("../proto/fixtures/behavior").normalize();
|
||||
if (Files.exists(candidate)) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("could not locate behavior fixtures from " + current);
|
||||
}
|
||||
|
||||
private static String lowerCamelKind(MxValue value) {
|
||||
String[] parts = value.getKindCase().name().toLowerCase().split("_");
|
||||
StringBuilder result = new StringBuilder(parts[0]);
|
||||
for (int index = 1; index < parts.length; index++) {
|
||||
result.append(Character.toUpperCase(parts[index].charAt(0))).append(parts[index].substring(1));
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
pluginManagement {
|
||||
repositories {
|
||||
gradlePluginPortal()
|
||||
mavenCentral()
|
||||
}
|
||||
|
||||
plugins {
|
||||
id 'com.google.protobuf' version '0.9.5'
|
||||
}
|
||||
}
|
||||
|
||||
dependencyResolutionManagement {
|
||||
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
|
||||
repositories {
|
||||
mavenCentral()
|
||||
}
|
||||
}
|
||||
|
||||
rootProject.name = 'mxaccessgw-java'
|
||||
|
||||
include 'mxgateway-client'
|
||||
include 'mxgateway-cli'
|
||||
+588
@@ -0,0 +1,588 @@
|
||||
package mxaccess_gateway.v1;
|
||||
|
||||
import static io.grpc.MethodDescriptor.generateFullMethodName;
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
@io.grpc.stub.annotations.GrpcGenerated
|
||||
public final class MxAccessGatewayGrpc {
|
||||
|
||||
private MxAccessGatewayGrpc() {}
|
||||
|
||||
public static final java.lang.String SERVICE_NAME = "mxaccess_gateway.v1.MxAccessGateway";
|
||||
|
||||
// Static method descriptors that strictly reflect the proto.
|
||||
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
|
||||
|
||||
@io.grpc.stub.annotations.RpcMethod(
|
||||
fullMethodName = SERVICE_NAME + '/' + "OpenSession",
|
||||
requestType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.class,
|
||||
responseType = mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.class,
|
||||
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod() {
|
||||
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> getOpenSessionMethod;
|
||||
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
|
||||
synchronized (MxAccessGatewayGrpc.class) {
|
||||
if ((getOpenSessionMethod = MxAccessGatewayGrpc.getOpenSessionMethod) == null) {
|
||||
MxAccessGatewayGrpc.getOpenSessionMethod = getOpenSessionMethod =
|
||||
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest, mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>newBuilder()
|
||||
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "OpenSession"))
|
||||
.setSampledToLocalTracing(true)
|
||||
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.getDefaultInstance()))
|
||||
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply.getDefaultInstance()))
|
||||
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("OpenSession"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return getOpenSessionMethod;
|
||||
}
|
||||
|
||||
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
|
||||
|
||||
@io.grpc.stub.annotations.RpcMethod(
|
||||
fullMethodName = SERVICE_NAME + '/' + "CloseSession",
|
||||
requestType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.class,
|
||||
responseType = mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.class,
|
||||
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod() {
|
||||
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> getCloseSessionMethod;
|
||||
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
|
||||
synchronized (MxAccessGatewayGrpc.class) {
|
||||
if ((getCloseSessionMethod = MxAccessGatewayGrpc.getCloseSessionMethod) == null) {
|
||||
MxAccessGatewayGrpc.getCloseSessionMethod = getCloseSessionMethod =
|
||||
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest, mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>newBuilder()
|
||||
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "CloseSession"))
|
||||
.setSampledToLocalTracing(true)
|
||||
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.getDefaultInstance()))
|
||||
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply.getDefaultInstance()))
|
||||
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("CloseSession"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return getCloseSessionMethod;
|
||||
}
|
||||
|
||||
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
|
||||
|
||||
@io.grpc.stub.annotations.RpcMethod(
|
||||
fullMethodName = SERVICE_NAME + '/' + "Invoke",
|
||||
requestType = mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.class,
|
||||
responseType = mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.class,
|
||||
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod() {
|
||||
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> getInvokeMethod;
|
||||
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
|
||||
synchronized (MxAccessGatewayGrpc.class) {
|
||||
if ((getInvokeMethod = MxAccessGatewayGrpc.getInvokeMethod) == null) {
|
||||
MxAccessGatewayGrpc.getInvokeMethod = getInvokeMethod =
|
||||
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest, mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>newBuilder()
|
||||
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
|
||||
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Invoke"))
|
||||
.setSampledToLocalTracing(true)
|
||||
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest.getDefaultInstance()))
|
||||
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply.getDefaultInstance()))
|
||||
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("Invoke"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return getInvokeMethod;
|
||||
}
|
||||
|
||||
private static volatile io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
|
||||
|
||||
@io.grpc.stub.annotations.RpcMethod(
|
||||
fullMethodName = SERVICE_NAME + '/' + "StreamEvents",
|
||||
requestType = mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.class,
|
||||
responseType = mxaccess_gateway.v1.MxaccessGateway.MxEvent.class,
|
||||
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
|
||||
public static io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod() {
|
||||
io.grpc.MethodDescriptor<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent> getStreamEventsMethod;
|
||||
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
|
||||
synchronized (MxAccessGatewayGrpc.class) {
|
||||
if ((getStreamEventsMethod = MxAccessGatewayGrpc.getStreamEventsMethod) == null) {
|
||||
MxAccessGatewayGrpc.getStreamEventsMethod = getStreamEventsMethod =
|
||||
io.grpc.MethodDescriptor.<mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest, mxaccess_gateway.v1.MxaccessGateway.MxEvent>newBuilder()
|
||||
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING)
|
||||
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "StreamEvents"))
|
||||
.setSampledToLocalTracing(true)
|
||||
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest.getDefaultInstance()))
|
||||
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent.getDefaultInstance()))
|
||||
.setSchemaDescriptor(new MxAccessGatewayMethodDescriptorSupplier("StreamEvents"))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return getStreamEventsMethod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new async stub that supports all call types for the service
|
||||
*/
|
||||
public static MxAccessGatewayStub newStub(io.grpc.Channel channel) {
|
||||
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub> factory =
|
||||
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayStub>() {
|
||||
@java.lang.Override
|
||||
public MxAccessGatewayStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayStub(channel, callOptions);
|
||||
}
|
||||
};
|
||||
return MxAccessGatewayStub.newStub(factory, channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new blocking-style stub that supports all types of calls on the service
|
||||
*/
|
||||
public static MxAccessGatewayBlockingV2Stub newBlockingV2Stub(
|
||||
io.grpc.Channel channel) {
|
||||
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub> factory =
|
||||
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingV2Stub>() {
|
||||
@java.lang.Override
|
||||
public MxAccessGatewayBlockingV2Stub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
|
||||
}
|
||||
};
|
||||
return MxAccessGatewayBlockingV2Stub.newStub(factory, channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
|
||||
*/
|
||||
public static MxAccessGatewayBlockingStub newBlockingStub(
|
||||
io.grpc.Channel channel) {
|
||||
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub> factory =
|
||||
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayBlockingStub>() {
|
||||
@java.lang.Override
|
||||
public MxAccessGatewayBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayBlockingStub(channel, callOptions);
|
||||
}
|
||||
};
|
||||
return MxAccessGatewayBlockingStub.newStub(factory, channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ListenableFuture-style stub that supports unary calls on the service
|
||||
*/
|
||||
public static MxAccessGatewayFutureStub newFutureStub(
|
||||
io.grpc.Channel channel) {
|
||||
io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub> factory =
|
||||
new io.grpc.stub.AbstractStub.StubFactory<MxAccessGatewayFutureStub>() {
|
||||
@java.lang.Override
|
||||
public MxAccessGatewayFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayFutureStub(channel, callOptions);
|
||||
}
|
||||
};
|
||||
return MxAccessGatewayFutureStub.newStub(factory, channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public interface AsyncService {
|
||||
|
||||
/**
|
||||
*/
|
||||
default void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
|
||||
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getOpenSessionMethod(), responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
default void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
|
||||
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getCloseSessionMethod(), responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
default void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
|
||||
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
default void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
|
||||
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getStreamEventsMethod(), responseObserver);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for the server implementation of the service MxAccessGateway.
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public static abstract class MxAccessGatewayImplBase
|
||||
implements io.grpc.BindableService, AsyncService {
|
||||
|
||||
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
|
||||
return MxAccessGatewayGrpc.bindService(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A stub to allow clients to do asynchronous rpc calls to service MxAccessGateway.
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public static final class MxAccessGatewayStub
|
||||
extends io.grpc.stub.AbstractAsyncStub<MxAccessGatewayStub> {
|
||||
private MxAccessGatewayStub(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
super(channel, callOptions);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
protected MxAccessGatewayStub build(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayStub(channel, callOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public void openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> responseObserver) {
|
||||
io.grpc.stub.ClientCalls.asyncUnaryCall(
|
||||
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request, responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public void closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> responseObserver) {
|
||||
io.grpc.stub.ClientCalls.asyncUnaryCall(
|
||||
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request, responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public void invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> responseObserver) {
|
||||
io.grpc.stub.ClientCalls.asyncUnaryCall(
|
||||
getChannel().newCall(getInvokeMethod(), getCallOptions()), request, responseObserver);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public void streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request,
|
||||
io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent> responseObserver) {
|
||||
io.grpc.stub.ClientCalls.asyncServerStreamingCall(
|
||||
getChannel().newCall(getStreamEventsMethod(), getCallOptions()), request, responseObserver);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A stub to allow clients to do synchronous rpc calls to service MxAccessGateway.
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public static final class MxAccessGatewayBlockingV2Stub
|
||||
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingV2Stub> {
|
||||
private MxAccessGatewayBlockingV2Stub(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
super(channel, callOptions);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
protected MxAccessGatewayBlockingV2Stub build(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayBlockingV2Stub(channel, callOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) throws io.grpc.StatusException {
|
||||
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
|
||||
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) throws io.grpc.StatusException {
|
||||
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
|
||||
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) throws io.grpc.StatusException {
|
||||
return io.grpc.stub.ClientCalls.blockingV2UnaryCall(
|
||||
getChannel(), getInvokeMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918")
|
||||
public io.grpc.stub.BlockingClientCall<?, mxaccess_gateway.v1.MxaccessGateway.MxEvent>
|
||||
streamEvents(mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
|
||||
return io.grpc.stub.ClientCalls.blockingV2ServerStreamingCall(
|
||||
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A stub to allow clients to do limited synchronous rpc calls to service MxAccessGateway.
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public static final class MxAccessGatewayBlockingStub
|
||||
extends io.grpc.stub.AbstractBlockingStub<MxAccessGatewayBlockingStub> {
|
||||
private MxAccessGatewayBlockingStub(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
super(channel, callOptions);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
protected MxAccessGatewayBlockingStub build(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayBlockingStub(channel, callOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply openSession(mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
|
||||
return io.grpc.stub.ClientCalls.blockingUnaryCall(
|
||||
getChannel(), getOpenSessionMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
|
||||
return io.grpc.stub.ClientCalls.blockingUnaryCall(
|
||||
getChannel(), getCloseSessionMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public mxaccess_gateway.v1.MxaccessGateway.MxCommandReply invoke(mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
|
||||
return io.grpc.stub.ClientCalls.blockingUnaryCall(
|
||||
getChannel(), getInvokeMethod(), getCallOptions(), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public java.util.Iterator<mxaccess_gateway.v1.MxaccessGateway.MxEvent> streamEvents(
|
||||
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest request) {
|
||||
return io.grpc.stub.ClientCalls.blockingServerStreamingCall(
|
||||
getChannel(), getStreamEventsMethod(), getCallOptions(), request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A stub to allow clients to do ListenableFuture-style rpc calls to service MxAccessGateway.
|
||||
* <pre>
|
||||
* Public client API for MXAccess sessions hosted by the gateway.
|
||||
* </pre>
|
||||
*/
|
||||
public static final class MxAccessGatewayFutureStub
|
||||
extends io.grpc.stub.AbstractFutureStub<MxAccessGatewayFutureStub> {
|
||||
private MxAccessGatewayFutureStub(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
super(channel, callOptions);
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
protected MxAccessGatewayFutureStub build(
|
||||
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
|
||||
return new MxAccessGatewayFutureStub(channel, callOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply> openSession(
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest request) {
|
||||
return io.grpc.stub.ClientCalls.futureUnaryCall(
|
||||
getChannel().newCall(getOpenSessionMethod(), getCallOptions()), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply> closeSession(
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest request) {
|
||||
return io.grpc.stub.ClientCalls.futureUnaryCall(
|
||||
getChannel().newCall(getCloseSessionMethod(), getCallOptions()), request);
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public com.google.common.util.concurrent.ListenableFuture<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply> invoke(
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest request) {
|
||||
return io.grpc.stub.ClientCalls.futureUnaryCall(
|
||||
getChannel().newCall(getInvokeMethod(), getCallOptions()), request);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int METHODID_OPEN_SESSION = 0;
|
||||
private static final int METHODID_CLOSE_SESSION = 1;
|
||||
private static final int METHODID_INVOKE = 2;
|
||||
private static final int METHODID_STREAM_EVENTS = 3;
|
||||
|
||||
private static final class MethodHandlers<Req, Resp> implements
|
||||
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
|
||||
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
|
||||
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
|
||||
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
|
||||
private final AsyncService serviceImpl;
|
||||
private final int methodId;
|
||||
|
||||
MethodHandlers(AsyncService serviceImpl, int methodId) {
|
||||
this.serviceImpl = serviceImpl;
|
||||
this.methodId = methodId;
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
@java.lang.SuppressWarnings("unchecked")
|
||||
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
|
||||
switch (methodId) {
|
||||
case METHODID_OPEN_SESSION:
|
||||
serviceImpl.openSession((mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest) request,
|
||||
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>) responseObserver);
|
||||
break;
|
||||
case METHODID_CLOSE_SESSION:
|
||||
serviceImpl.closeSession((mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest) request,
|
||||
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>) responseObserver);
|
||||
break;
|
||||
case METHODID_INVOKE:
|
||||
serviceImpl.invoke((mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest) request,
|
||||
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>) responseObserver);
|
||||
break;
|
||||
case METHODID_STREAM_EVENTS:
|
||||
serviceImpl.streamEvents((mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest) request,
|
||||
(io.grpc.stub.StreamObserver<mxaccess_gateway.v1.MxaccessGateway.MxEvent>) responseObserver);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
@java.lang.SuppressWarnings("unchecked")
|
||||
public io.grpc.stub.StreamObserver<Req> invoke(
|
||||
io.grpc.stub.StreamObserver<Resp> responseObserver) {
|
||||
switch (methodId) {
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
|
||||
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
|
||||
.addMethod(
|
||||
getOpenSessionMethod(),
|
||||
io.grpc.stub.ServerCalls.asyncUnaryCall(
|
||||
new MethodHandlers<
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply>(
|
||||
service, METHODID_OPEN_SESSION)))
|
||||
.addMethod(
|
||||
getCloseSessionMethod(),
|
||||
io.grpc.stub.ServerCalls.asyncUnaryCall(
|
||||
new MethodHandlers<
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply>(
|
||||
service, METHODID_CLOSE_SESSION)))
|
||||
.addMethod(
|
||||
getInvokeMethod(),
|
||||
io.grpc.stub.ServerCalls.asyncUnaryCall(
|
||||
new MethodHandlers<
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxCommandReply>(
|
||||
service, METHODID_INVOKE)))
|
||||
.addMethod(
|
||||
getStreamEventsMethod(),
|
||||
io.grpc.stub.ServerCalls.asyncServerStreamingCall(
|
||||
new MethodHandlers<
|
||||
mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest,
|
||||
mxaccess_gateway.v1.MxaccessGateway.MxEvent>(
|
||||
service, METHODID_STREAM_EVENTS)))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static abstract class MxAccessGatewayBaseDescriptorSupplier
|
||||
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
|
||||
MxAccessGatewayBaseDescriptorSupplier() {}
|
||||
|
||||
@java.lang.Override
|
||||
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
|
||||
return mxaccess_gateway.v1.MxaccessGateway.getDescriptor();
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
|
||||
return getFileDescriptor().findServiceByName("MxAccessGateway");
|
||||
}
|
||||
}
|
||||
|
||||
private static final class MxAccessGatewayFileDescriptorSupplier
|
||||
extends MxAccessGatewayBaseDescriptorSupplier {
|
||||
MxAccessGatewayFileDescriptorSupplier() {}
|
||||
}
|
||||
|
||||
private static final class MxAccessGatewayMethodDescriptorSupplier
|
||||
extends MxAccessGatewayBaseDescriptorSupplier
|
||||
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
|
||||
private final java.lang.String methodName;
|
||||
|
||||
MxAccessGatewayMethodDescriptorSupplier(java.lang.String methodName) {
|
||||
this.methodName = methodName;
|
||||
}
|
||||
|
||||
@java.lang.Override
|
||||
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
|
||||
return getServiceDescriptor().findMethodByName(methodName);
|
||||
}
|
||||
}
|
||||
|
||||
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
|
||||
|
||||
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
|
||||
io.grpc.ServiceDescriptor result = serviceDescriptor;
|
||||
if (result == null) {
|
||||
synchronized (MxAccessGatewayGrpc.class) {
|
||||
result = serviceDescriptor;
|
||||
if (result == null) {
|
||||
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
|
||||
.setSchemaDescriptor(new MxAccessGatewayFileDescriptorSupplier())
|
||||
.addMethod(getOpenSessionMethod())
|
||||
.addMethod(getCloseSessionMethod())
|
||||
.addMethod(getInvokeMethod())
|
||||
.addMethod(getStreamEventsMethod())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,469 @@
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"fixtureSet": "mxaccess-gateway-parity-fixture-matrix",
|
||||
"contractName": "mxaccess-gateway",
|
||||
"gatewayProtocolVersion": 1,
|
||||
"workerProtocolVersion": 1,
|
||||
"sourceCaptureRoot": "C:/Users/dohertj2/Desktop/mxaccess/captures",
|
||||
"sourceDocs": [
|
||||
"C:/Users/dohertj2/Desktop/mxaccess/docs/MXAccess-Public-API.md",
|
||||
"C:/Users/dohertj2/Desktop/mxaccess/docs/Current-Sprint-State.md"
|
||||
],
|
||||
"comparisonFormat": {
|
||||
"description": "Each parity run records the same command against direct MXAccess and the gateway-backed worker, then compares raw parity fields instead of client wrapper behavior.",
|
||||
"directMxAccess": {
|
||||
"requiredFields": [
|
||||
"method",
|
||||
"arguments",
|
||||
"returnedValue",
|
||||
"hresult",
|
||||
"exceptionType",
|
||||
"statuses",
|
||||
"events"
|
||||
]
|
||||
},
|
||||
"gatewayResult": {
|
||||
"requiredFields": [
|
||||
"kind",
|
||||
"protocolStatus",
|
||||
"returnValue",
|
||||
"hresult",
|
||||
"statuses",
|
||||
"diagnosticMessage",
|
||||
"events"
|
||||
]
|
||||
},
|
||||
"eventFields": [
|
||||
"family",
|
||||
"serverHandle",
|
||||
"itemHandle",
|
||||
"value",
|
||||
"quality",
|
||||
"sourceTimestamp",
|
||||
"statuses",
|
||||
"workerSequence",
|
||||
"workerTimestamp",
|
||||
"gatewayReceiveTimestamp",
|
||||
"hresult",
|
||||
"rawStatus"
|
||||
],
|
||||
"comparisonKeys": [
|
||||
"hresult",
|
||||
"exceptionType",
|
||||
"returnedValue",
|
||||
"statusArrayShape",
|
||||
"statusRawFields",
|
||||
"eventFamilyOrder",
|
||||
"eventPayloadShape",
|
||||
"valueProjection",
|
||||
"rawFallbackMetadata"
|
||||
]
|
||||
},
|
||||
"methodFixtures": [
|
||||
{
|
||||
"id": "method.register.basic",
|
||||
"method": "Register",
|
||||
"commandKind": "MX_COMMAND_KIND_REGISTER",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/001-register/harness.log",
|
||||
"captures/047-frida-com-proxy-register/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve returned server handle in returnValue and RegisterReply",
|
||||
"preserve success HRESULT as 0",
|
||||
"do not emit MXAccess events for register"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.unregister.basic",
|
||||
"method": "Unregister",
|
||||
"commandKind": "MX_COMMAND_KIND_UNREGISTER",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/001-register/harness.log",
|
||||
"captures/109-native-post-remove-errors/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve void return shape with explicit protocol success",
|
||||
"preserve HRESULT or COM exception details for invalid server handle",
|
||||
"close registered handle only after MXAccess succeeds"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.add-item.scalar",
|
||||
"method": "AddItem",
|
||||
"commandKind": "MX_COMMAND_KIND_ADD_ITEM",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/002-add-remove-scalar/harness.log",
|
||||
"captures/006-add-invalid/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve returned item handle in returnValue and AddItemReply",
|
||||
"preserve invalid item reference HRESULT/status details",
|
||||
"do not prevalidate item definition in the gateway"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.add-item2.context",
|
||||
"method": "AddItem2",
|
||||
"commandKind": "MX_COMMAND_KIND_ADD_ITEM2",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/mxaccess-additem2-testint-context.log",
|
||||
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"pass item_definition and item_context exactly as supplied",
|
||||
"preserve returned item handle in returnValue and AddItem2Reply",
|
||||
"compare context-bearing reference resolution against direct MXAccess"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.remove-item.basic",
|
||||
"method": "RemoveItem",
|
||||
"commandKind": "MX_COMMAND_KIND_REMOVE_ITEM",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/002-add-remove-scalar/harness.log",
|
||||
"captures/109-native-post-remove-errors/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve void return shape with explicit protocol success",
|
||||
"preserve post-remove and invalid-handle HRESULT/status behavior",
|
||||
"remove diagnostic handle state only after MXAccess succeeds"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.advise.supervisory-data-change",
|
||||
"method": "Advise",
|
||||
"commandKind": "MX_COMMAND_KIND_ADVISE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/003-subscribe-scalars/harness.log",
|
||||
"captures/058-frida-subscribe-testint/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve successful command reply shape",
|
||||
"forward OnDataChange with value, quality, timestamp, and status array",
|
||||
"preserve per-worker event order"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.unadvise.basic",
|
||||
"method": "UnAdvise",
|
||||
"commandKind": "MX_COMMAND_KIND_UN_ADVISE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/058-frida-subscribe-testint/harness.log",
|
||||
"captures/007-subscribe-invalid/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve void return shape with explicit protocol success",
|
||||
"preserve invalid item handle HRESULT/status behavior",
|
||||
"do not distinguish plain and supervisory cleanup beyond MXAccess behavior"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.advise-supervisory.basic",
|
||||
"method": "AdviseSupervisory",
|
||||
"commandKind": "MX_COMMAND_KIND_ADVISE_SUPERVISORY",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/058-frida-subscribe-testint/harness.log",
|
||||
"captures/105-frida-advise-shortdesc-prebound-fixed/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"keep AdviseSupervisory distinct from plain Advise in command kind",
|
||||
"forward native OnDataChange only when MXAccess emits it",
|
||||
"compare supervisory item status arrays without normalization"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.add-buffered-item.context",
|
||||
"method": "AddBufferedItem",
|
||||
"commandKind": "MX_COMMAND_KIND_ADD_BUFFERED_ITEM",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/079-frida-add-buffered-advise-testint/harness.log",
|
||||
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
|
||||
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"pass item_definition and item_context exactly as supplied",
|
||||
"preserve returned buffered item handle in returnValue and AddBufferedItemReply",
|
||||
"keep buffered registration distinct from normal AddItem2"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.set-buffered-update-interval.basic",
|
||||
"method": "SetBufferedUpdateInterval",
|
||||
"commandKind": "MX_COMMAND_KIND_SET_BUFFERED_UPDATE_INTERVAL",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/mxaccess-set-buffered-interval-1000.log",
|
||||
"captures/079-frida-add-buffered-advise-testint/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve requested update interval without clamping in the gateway",
|
||||
"preserve void return shape with explicit protocol success",
|
||||
"compare buffered event cadence only in opt-in live runs"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.suspend.scan-state",
|
||||
"method": "Suspend",
|
||||
"commandKind": "MX_COMMAND_KIND_SUSPEND",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/077-frida-suspend-advised-scanstate/harness.log",
|
||||
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve out MxStatus in SuspendReply and repeated statuses",
|
||||
"preserve HRESULT separately from status detail",
|
||||
"do not synthesize OperationComplete if native MXAccess does not raise it"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.activate.scan-state",
|
||||
"method": "Activate",
|
||||
"commandKind": "MX_COMMAND_KIND_ACTIVATE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/078-frida-activate-advised-scanstate/harness.log",
|
||||
"captures/119-frida-activate-advised-scanstate-long/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve out MxStatus in ActivateReply and repeated statuses",
|
||||
"preserve HRESULT separately from status detail",
|
||||
"do not synthesize OperationComplete if native MXAccess does not raise it"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.write.value-status-matrix",
|
||||
"method": "Write",
|
||||
"commandKind": "MX_COMMAND_KIND_WRITE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/023-frida-write-test-int-sequence-109-111/harness.log",
|
||||
"captures/024-frida-write-test-bool-sequence/harness.log",
|
||||
"captures/089-frida-write-testint-wrong-type/harness.log",
|
||||
"captures/090-frida-write-invalid-reference/harness.log",
|
||||
"captures/107-native-write-testint-current/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve scalar and array value projections plus raw fallback metadata",
|
||||
"preserve wrong-type and invalid-reference HRESULT/status arrays",
|
||||
"forward OnWriteComplete only when native MXAccess emits it"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.write2.timestamped",
|
||||
"method": "Write2",
|
||||
"commandKind": "MX_COMMAND_KIND_WRITE2",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/042-frida-write2-test-int-timestamp/harness.log",
|
||||
"captures/066-frida-write2-test-bool-timestamp/harness.log",
|
||||
"captures/075-frida-write2-test-datetime-array-timestamp/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve timestamp_value as an MXAccess VARIANT projection",
|
||||
"preserve write value shape and HRESULT/status arrays",
|
||||
"compare timestamped write completion events against direct MXAccess"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.write-secured.rejection-gap",
|
||||
"method": "WriteSecured",
|
||||
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED",
|
||||
"status": "documented_gap",
|
||||
"captureReferences": [
|
||||
"captures/036-frida-write-secured-test-int/harness.log",
|
||||
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
|
||||
"captures/112-frida-write-secured-auth-verified-protectedvalue1/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve observed 0x80004021 rejection before a value-bearing NMX body",
|
||||
"preserve current_user_id and verifier_user_id only as command inputs, not logs",
|
||||
"upgrade this gap to planned_fixture when a successful direct WriteSecured path is observed"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.write-secured2.authenticated",
|
||||
"method": "WriteSecured2",
|
||||
"commandKind": "MX_COMMAND_KIND_WRITE_SECURED2",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
|
||||
"captures/116-frida-write-secured2-auth-verified-protectedvalue1/harness.log",
|
||||
"captures/117-frida-write-secured2-auth-testint/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve authenticated timestamped secured write body shape",
|
||||
"preserve HRESULT/status arrays without logging credential-bearing values",
|
||||
"do not synthesize OnWriteComplete when direct MXAccess does not emit it"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.authenticate-user.basic",
|
||||
"method": "AuthenticateUser",
|
||||
"commandKind": "MX_COMMAND_KIND_AUTHENTICATE_USER",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/087-frida-authenticate-administrator-empty/harness.log",
|
||||
"captures/088-frida-authenticate-invalid-empty/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve returned user id in returnValue and AuthenticateUserReply",
|
||||
"preserve invalid credential HRESULT/status behavior",
|
||||
"redact verify_user_password from logs and diagnostics"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "method.archestra-user-to-id.basic",
|
||||
"method": "ArchestrAUserToId",
|
||||
"commandKind": "MX_COMMAND_KIND_ARCHESTRA_USER_TO_ID",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/mxaccess-user-map-administrator.log",
|
||||
"captures/mxaccess-user-map-invalid.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve returned user id in returnValue and ArchestrAUserToIdReply",
|
||||
"preserve invalid user GUID HRESULT/status behavior",
|
||||
"compare raw mapping behavior without normalizing unknown users"
|
||||
]
|
||||
}
|
||||
],
|
||||
"eventFixtures": [
|
||||
{
|
||||
"id": "event.on-data-change.scalar",
|
||||
"family": "MX_EVENT_FAMILY_ON_DATA_CHANGE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/003-subscribe-scalars/harness.log",
|
||||
"captures/106-native-subscribe-testint-current/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve value, quality, timestamp, status array, and worker sequence"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "event.on-write-complete.status",
|
||||
"family": "MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
|
||||
"status": "planned_fixture",
|
||||
"captureReferences": [
|
||||
"captures/008-write-test-int-same-value/harness.log",
|
||||
"captures/107-native-write-testint-current/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve write-complete status array and optional HRESULT"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "event.operation-complete.native-trigger-gap",
|
||||
"family": "MX_EVENT_FAMILY_OPERATION_COMPLETE",
|
||||
"status": "documented_gap",
|
||||
"captureReferences": [
|
||||
"captures/077-frida-suspend-advised-scanstate/harness.log",
|
||||
"captures/118-frida-suspend-advised-scanstate-long/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"do not synthesize OperationComplete from Write or OnWriteComplete",
|
||||
"upgrade this gap when a public MXAccess trigger emits event family 3"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "event.on-buffered-data-change.batch-gap",
|
||||
"family": "MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
|
||||
"status": "documented_gap",
|
||||
"captureReferences": [
|
||||
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
|
||||
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
|
||||
],
|
||||
"assertions": [
|
||||
"preserve raw buffered metadata until a public multi-sample event payload is observed",
|
||||
"upgrade this gap when OnBufferedDataChange batches are captured from MXAccess"
|
||||
]
|
||||
}
|
||||
],
|
||||
"scenarioGroups": [
|
||||
{
|
||||
"id": "invalid_handles",
|
||||
"description": "Invalid server, item, post-remove, and invalid-reference cases keep MXAccess-owned HRESULT and status behavior.",
|
||||
"fixtureIds": [
|
||||
"method.add-item.scalar",
|
||||
"method.remove-item.basic",
|
||||
"method.unadvise.basic",
|
||||
"method.write.value-status-matrix",
|
||||
"method.unregister.basic"
|
||||
],
|
||||
"captureReferences": [
|
||||
"captures/006-add-invalid/harness.log",
|
||||
"captures/007-subscribe-invalid/harness.log",
|
||||
"captures/109-native-post-remove-errors/harness.log",
|
||||
"captures/110-native-invalid-handle-errors/harness.log"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "write_statuses",
|
||||
"description": "Write success, wrong type, invalid reference, scalar arrays, and completion-status cases compare HRESULT, status array, value projection, and event shape.",
|
||||
"fixtureIds": [
|
||||
"method.write.value-status-matrix",
|
||||
"method.write2.timestamped",
|
||||
"event.on-write-complete.status"
|
||||
],
|
||||
"captureReferences": [
|
||||
"captures/089-frida-write-testint-wrong-type/harness.log",
|
||||
"captures/090-frida-write-invalid-reference/harness.log",
|
||||
"captures/091-frida-write-testint-double-type/harness.log",
|
||||
"captures/097-frida-write-bool-array-pattern/harness.log",
|
||||
"captures/107-native-write-testint-current/harness.log"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "secured_writes",
|
||||
"description": "Secured writes include observed WriteSecured rejection and authenticated WriteSecured2 success paths without logging credential-bearing values.",
|
||||
"fixtureIds": [
|
||||
"method.write-secured.rejection-gap",
|
||||
"method.write-secured2.authenticated",
|
||||
"method.authenticate-user.basic"
|
||||
],
|
||||
"captureReferences": [
|
||||
"captures/036-frida-write-secured-test-int/harness.log",
|
||||
"captures/111-frida-write-secured-auth-protectedvalue/harness.log",
|
||||
"captures/113-frida-write-secured2-auth-protectedvalue/harness.log",
|
||||
"captures/117-frida-write-secured2-auth-testint/harness.log"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "add_item_context",
|
||||
"description": "Context-bearing item registration compares AddItem2 and buffered AddBufferedItem argument preservation.",
|
||||
"fixtureIds": [
|
||||
"method.add-item2.context",
|
||||
"method.add-buffered-item.context"
|
||||
],
|
||||
"captureReferences": [
|
||||
"captures/mxaccess-additem2-testint-context.log",
|
||||
"captures/121-frida-buffered-history-testhistoryvalue-context/harness.log"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "buffered_registration",
|
||||
"description": "Buffered registration and interval setup are tracked separately from normal advice until a public buffered data-change batch is captured.",
|
||||
"fixtureIds": [
|
||||
"method.add-buffered-item.context",
|
||||
"method.set-buffered-update-interval.basic",
|
||||
"event.on-buffered-data-change.batch-gap"
|
||||
],
|
||||
"captureReferences": [
|
||||
"captures/079-frida-add-buffered-advise-testint/harness.log",
|
||||
"captures/120-frida-buffered-history-testhistoryvalue/harness.log",
|
||||
"captures/122-frida-buffered-history-testhistoryvalue-plainadvise/harness.log"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
# Python Client
|
||||
|
||||
The Python client package contains generated MXAccess Gateway protobuf
|
||||
bindings, the `mxgateway` package scaffold, and the `mxgw-py` test CLI
|
||||
scaffold. The package uses the shared proto inputs documented in
|
||||
bindings, the async `mxgateway` package, and the `mxgw-py` test CLI. The
|
||||
package uses the shared proto inputs documented in
|
||||
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
|
||||
sync.
|
||||
|
||||
@@ -43,15 +43,65 @@ python -m pytest
|
||||
python -m pip wheel . --no-deps --wheel-dir "$env:TEMP\mxgateway-python-wheel"
|
||||
```
|
||||
|
||||
The scaffold tests import the generated gateway and worker stubs and exercise
|
||||
the deterministic CLI version output.
|
||||
The tests import the generated gateway and worker stubs, run fake async gateway
|
||||
stubs, verify API key metadata, exercise stream cancellation, load shared value
|
||||
and command fixtures, and check deterministic CLI output.
|
||||
|
||||
## Library Usage
|
||||
|
||||
The library is async-first:
|
||||
|
||||
```python
|
||||
from mxgateway import GatewayClient
|
||||
|
||||
async with await GatewayClient.connect(
|
||||
endpoint="localhost:5000",
|
||||
api_key="mxgw_example",
|
||||
plaintext=True,
|
||||
) as client:
|
||||
session = await client.open_session(client_session_name="python-client")
|
||||
try:
|
||||
server_handle = await session.register("python-client")
|
||||
item_handle = await session.add_item(server_handle, "Object.Attribute")
|
||||
await session.advise(server_handle, item_handle)
|
||||
finally:
|
||||
await session.close()
|
||||
```
|
||||
|
||||
`GatewayClient.open_session_raw`, `GatewayClient.invoke_raw`, and
|
||||
`GatewayClient.stream_events_raw` keep the generated protobuf replies and
|
||||
events available for parity tests. `Session` helpers call the method-specific
|
||||
MXAccess commands and preserve raw replies on typed command exceptions.
|
||||
|
||||
Canceling a Python task cancels the client-side gRPC call or stream wait. It
|
||||
does not abort an in-flight MXAccess COM call inside the worker process.
|
||||
|
||||
## Authentication And TLS
|
||||
|
||||
`ClientOptions.api_key` adds this metadata to unary calls and streams:
|
||||
|
||||
```text
|
||||
authorization: Bearer <api-key>
|
||||
```
|
||||
|
||||
The client supports plaintext channels for local development, TLS with system
|
||||
roots, TLS with a custom `ca_file`, and an optional test server name override.
|
||||
API keys are redacted from option repr output and CLI error output.
|
||||
|
||||
## CLI
|
||||
|
||||
The scaffold CLI exposes version information:
|
||||
The CLI emits deterministic JSON for automation:
|
||||
|
||||
```powershell
|
||||
mxgw-py version --json
|
||||
mxgw-py open-session --endpoint localhost:5000 --plaintext --json
|
||||
mxgw-py register --session-id <id> --client-name python-client --json
|
||||
mxgw-py add-item --session-id <id> --server-handle 1 --item Object.Attribute --json
|
||||
mxgw-py advise --session-id <id> --server-handle 1 --item-handle 2 --json
|
||||
mxgw-py stream-events --session-id <id> --max-events 1 --json
|
||||
mxgw-py write --session-id <id> --server-handle 1 --item-handle 2 --type int32 --value 123 --json
|
||||
```
|
||||
|
||||
Additional commands are implemented with the async client/session wrapper work.
|
||||
Use `--api-key` or `--api-key-env MXGATEWAY_API_KEY` to attach API key
|
||||
metadata. `smoke` opens a session, registers, adds an item, advises, streams a
|
||||
bounded event count, and closes the session in a `finally` block.
|
||||
|
||||
@@ -1,5 +1,38 @@
|
||||
"""MXAccess Gateway Python client package."""
|
||||
|
||||
from .auth import ApiKey, auth_metadata
|
||||
from .client import GatewayClient
|
||||
from .errors import (
|
||||
MxAccessError,
|
||||
MxGatewayAuthenticationError,
|
||||
MxGatewayAuthorizationError,
|
||||
MxGatewayCommandError,
|
||||
MxGatewayError,
|
||||
MxGatewaySessionError,
|
||||
MxGatewayTransportError,
|
||||
MxGatewayWorkerError,
|
||||
)
|
||||
from .options import ClientOptions
|
||||
from .session import Session
|
||||
from .values import MxValueView, from_mx_value, to_mx_value
|
||||
from .version import __version__
|
||||
|
||||
__all__ = ["__version__"]
|
||||
__all__ = [
|
||||
"ApiKey",
|
||||
"ClientOptions",
|
||||
"GatewayClient",
|
||||
"MxAccessError",
|
||||
"MxGatewayAuthenticationError",
|
||||
"MxGatewayAuthorizationError",
|
||||
"MxGatewayCommandError",
|
||||
"MxGatewayError",
|
||||
"MxGatewaySessionError",
|
||||
"MxGatewayTransportError",
|
||||
"MxGatewayWorkerError",
|
||||
"MxValueView",
|
||||
"Session",
|
||||
"__version__",
|
||||
"auth_metadata",
|
||||
"from_mx_value",
|
||||
"to_mx_value",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Authentication metadata helpers for MXAccess Gateway clients."""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
|
||||
AUTHORIZATION_HEADER = "authorization"
|
||||
REDACTED = "[redacted]"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ApiKey:
|
||||
"""API key wrapper that avoids leaking the secret through repr output."""
|
||||
|
||||
value: str
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.value:
|
||||
raise ValueError("api_key must not be empty")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{type(self).__name__}({REDACTED!r})"
|
||||
|
||||
def bearer_value(self) -> str:
|
||||
return f"Bearer {self.value}"
|
||||
|
||||
|
||||
def auth_metadata(api_key: str | ApiKey | None) -> tuple[tuple[str, str], ...]:
|
||||
"""Return gRPC metadata for API key auth."""
|
||||
|
||||
if api_key is None:
|
||||
return ()
|
||||
|
||||
key = api_key.value if isinstance(api_key, ApiKey) else api_key
|
||||
if not key:
|
||||
return ()
|
||||
|
||||
return ((AUTHORIZATION_HEADER, f"Bearer {key}"),)
|
||||
|
||||
|
||||
def merge_metadata(
|
||||
api_key: str | ApiKey | None,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> tuple[tuple[str, str], ...]:
|
||||
"""Merge caller metadata with API key metadata."""
|
||||
|
||||
merged = list(metadata or ())
|
||||
merged.extend(auth_metadata(api_key))
|
||||
return tuple(merged)
|
||||
|
||||
|
||||
def redact_secret(text: str, secrets: Sequence[str | None]) -> str:
|
||||
"""Replace known secret values with a stable redaction marker."""
|
||||
|
||||
redacted = text
|
||||
for secret in secrets:
|
||||
if secret:
|
||||
redacted = redacted.replace(secret, REDACTED)
|
||||
return redacted
|
||||
@@ -0,0 +1,165 @@
|
||||
"""Async MXAccess Gateway client wrapper."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator, Sequence
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
|
||||
from .auth import merge_metadata
|
||||
from .errors import ensure_protocol_success, map_rpc_error
|
||||
from .generated import mxaccess_gateway_pb2 as pb
|
||||
from .generated import mxaccess_gateway_pb2_grpc as pb_grpc
|
||||
from .options import ClientOptions, create_channel
|
||||
|
||||
|
||||
class GatewayClient:
|
||||
"""Async client for the public MXAccess Gateway gRPC API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
options: ClientOptions,
|
||||
stub: Any,
|
||||
channel: grpc.aio.Channel | None = None,
|
||||
) -> None:
|
||||
self.options = options
|
||||
self.raw_stub = stub
|
||||
self._channel = channel
|
||||
self._closed = False
|
||||
|
||||
@classmethod
|
||||
async def connect(
|
||||
cls,
|
||||
options: ClientOptions | None = None,
|
||||
*,
|
||||
endpoint: str | None = None,
|
||||
api_key: str | None = None,
|
||||
plaintext: bool = False,
|
||||
ca_file: str | None = None,
|
||||
server_name_override: str | None = None,
|
||||
stub: Any | None = None,
|
||||
) -> "GatewayClient":
|
||||
"""Create a client with either a real async channel or a supplied fake stub."""
|
||||
|
||||
resolved = options or ClientOptions(
|
||||
endpoint=endpoint or "",
|
||||
api_key=api_key,
|
||||
plaintext=plaintext,
|
||||
ca_file=ca_file,
|
||||
server_name_override=server_name_override,
|
||||
)
|
||||
|
||||
if stub is not None:
|
||||
return cls(options=resolved, stub=stub)
|
||||
|
||||
channel = create_channel(resolved)
|
||||
return cls(
|
||||
options=resolved,
|
||||
stub=pb_grpc.MxAccessGatewayStub(channel),
|
||||
channel=channel,
|
||||
)
|
||||
|
||||
async def __aenter__(self) -> "GatewayClient":
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_exc_info: object) -> None:
|
||||
await self.close()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the owned gRPC channel."""
|
||||
|
||||
if self._closed:
|
||||
return
|
||||
|
||||
self._closed = True
|
||||
if self._channel is not None:
|
||||
await self._channel.close()
|
||||
|
||||
async def open_session(
|
||||
self,
|
||||
request: pb.OpenSessionRequest | None = None,
|
||||
*,
|
||||
requested_backend: str = "",
|
||||
client_session_name: str = "",
|
||||
client_correlation_id: str = "",
|
||||
) -> "Session":
|
||||
"""Open a gateway session and return a high-level session wrapper."""
|
||||
|
||||
from .session import Session
|
||||
|
||||
raw_request = request or pb.OpenSessionRequest(
|
||||
requested_backend=requested_backend,
|
||||
client_session_name=client_session_name,
|
||||
client_correlation_id=client_correlation_id,
|
||||
)
|
||||
reply = await self.open_session_raw(raw_request)
|
||||
return Session(client=self, session_id=reply.session_id, open_reply=reply)
|
||||
|
||||
async def open_session_raw(self, request: pb.OpenSessionRequest) -> pb.OpenSessionReply:
|
||||
reply = await self._unary("open session", self.raw_stub.OpenSession, request)
|
||||
ensure_protocol_success("open session", reply.protocol_status, reply)
|
||||
return reply
|
||||
|
||||
async def close_session_raw(
|
||||
self,
|
||||
request: pb.CloseSessionRequest,
|
||||
) -> pb.CloseSessionReply:
|
||||
reply = await self._unary("close session", self.raw_stub.CloseSession, request)
|
||||
ensure_protocol_success("close session", reply.protocol_status, reply)
|
||||
return reply
|
||||
|
||||
async def invoke_raw(self, request: pb.MxCommandRequest) -> pb.MxCommandReply:
|
||||
reply = await self._unary("invoke", self.raw_stub.Invoke, request)
|
||||
ensure_protocol_success("invoke", reply.protocol_status, reply)
|
||||
return reply
|
||||
|
||||
def stream_events_raw(
|
||||
self,
|
||||
request: pb.StreamEventsRequest,
|
||||
*,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> AsyncIterator[pb.MxEvent]:
|
||||
"""Return an async event iterator and cancel the stream when iteration stops."""
|
||||
|
||||
call = self.raw_stub.StreamEvents(
|
||||
request,
|
||||
metadata=merge_metadata(self.options.api_key, metadata),
|
||||
)
|
||||
return _canceling_iterator(call)
|
||||
|
||||
async def _unary(
|
||||
self,
|
||||
operation: str,
|
||||
method: Any,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> Any:
|
||||
call = method(
|
||||
request,
|
||||
metadata=merge_metadata(self.options.api_key, metadata),
|
||||
)
|
||||
try:
|
||||
return await call
|
||||
except asyncio.CancelledError:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
raise
|
||||
except grpc.RpcError as error:
|
||||
raise map_rpc_error(operation, error) from error
|
||||
|
||||
|
||||
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
|
||||
try:
|
||||
async for event in call:
|
||||
yield event
|
||||
except grpc.RpcError as error:
|
||||
raise map_rpc_error("stream events", error) from error
|
||||
finally:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
@@ -0,0 +1,157 @@
|
||||
"""Typed exception model for MXAccess Gateway Python clients."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
|
||||
from .generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
|
||||
class MxGatewayError(Exception):
|
||||
"""Base class for client wrapper errors."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
protocol_status: pb.ProtocolStatus | None = None,
|
||||
raw_reply: Any | None = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.protocol_status = protocol_status
|
||||
self.raw_reply = raw_reply
|
||||
|
||||
|
||||
class MxGatewayTransportError(MxGatewayError):
|
||||
"""Transport-level gRPC failure."""
|
||||
|
||||
|
||||
class MxGatewayAuthenticationError(MxGatewayTransportError):
|
||||
"""Authentication failure reported by gRPC."""
|
||||
|
||||
|
||||
class MxGatewayAuthorizationError(MxGatewayTransportError):
|
||||
"""Authorization failure reported by gRPC."""
|
||||
|
||||
|
||||
class MxGatewaySessionError(MxGatewayError):
|
||||
"""Gateway session failure."""
|
||||
|
||||
|
||||
class MxGatewayWorkerError(MxGatewayError):
|
||||
"""Gateway worker process or protocol failure."""
|
||||
|
||||
|
||||
class MxGatewayCommandError(MxGatewayError):
|
||||
"""Command failure that preserves the raw protobuf reply."""
|
||||
|
||||
|
||||
class MxAccessError(MxGatewayCommandError):
|
||||
"""MXAccess HRESULT or status failure."""
|
||||
|
||||
|
||||
def map_rpc_error(operation: str, error: grpc.RpcError) -> MxGatewayTransportError:
|
||||
"""Map a generated gRPC exception to the client exception hierarchy."""
|
||||
|
||||
code = error.code() if hasattr(error, "code") else None
|
||||
details = error.details() if hasattr(error, "details") else str(error)
|
||||
message = f"{operation} failed: {details}"
|
||||
|
||||
if code == grpc.StatusCode.UNAUTHENTICATED:
|
||||
return MxGatewayAuthenticationError(message)
|
||||
if code == grpc.StatusCode.PERMISSION_DENIED:
|
||||
return MxGatewayAuthorizationError(message)
|
||||
|
||||
return MxGatewayTransportError(message)
|
||||
|
||||
|
||||
def ensure_protocol_success(
|
||||
operation: str,
|
||||
protocol_status: pb.ProtocolStatus | None,
|
||||
raw_reply: Any | None = None,
|
||||
) -> Any | None:
|
||||
"""Raise typed gateway errors for non-OK protocol statuses."""
|
||||
|
||||
code = (
|
||||
protocol_status.code
|
||||
if protocol_status is not None
|
||||
else pb.PROTOCOL_STATUS_CODE_UNSPECIFIED
|
||||
)
|
||||
|
||||
if code in (
|
||||
pb.PROTOCOL_STATUS_CODE_OK,
|
||||
pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
|
||||
):
|
||||
return raw_reply
|
||||
|
||||
message_text = protocol_status.message if protocol_status else ""
|
||||
message = f"{operation} failed: {message_text or pb.ProtocolStatusCode.Name(code)}"
|
||||
|
||||
if code in (
|
||||
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
|
||||
pb.PROTOCOL_STATUS_CODE_SESSION_NOT_READY,
|
||||
):
|
||||
raise MxGatewaySessionError(
|
||||
message,
|
||||
protocol_status=protocol_status,
|
||||
raw_reply=raw_reply,
|
||||
)
|
||||
|
||||
if code in (
|
||||
pb.PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE,
|
||||
pb.PROTOCOL_STATUS_CODE_TIMEOUT,
|
||||
pb.PROTOCOL_STATUS_CODE_CANCELED,
|
||||
pb.PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION,
|
||||
):
|
||||
raise MxGatewayWorkerError(
|
||||
message,
|
||||
protocol_status=protocol_status,
|
||||
raw_reply=raw_reply,
|
||||
)
|
||||
|
||||
raise MxGatewayCommandError(
|
||||
message,
|
||||
protocol_status=protocol_status,
|
||||
raw_reply=raw_reply,
|
||||
)
|
||||
|
||||
|
||||
def ensure_mxaccess_success(operation: str, reply: pb.MxCommandReply) -> pb.MxCommandReply:
|
||||
"""Raise `MxAccessError` when MXAccess returned HRESULT or status failure."""
|
||||
|
||||
status = reply.protocol_status
|
||||
if status.code == pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE:
|
||||
raise MxAccessError(
|
||||
_mxaccess_message(operation, reply),
|
||||
protocol_status=status,
|
||||
raw_reply=reply,
|
||||
)
|
||||
|
||||
if reply.HasField("hresult") and reply.hresult < 0:
|
||||
raise MxAccessError(
|
||||
_mxaccess_message(operation, reply),
|
||||
protocol_status=status,
|
||||
raw_reply=reply,
|
||||
)
|
||||
|
||||
for mx_status in reply.statuses:
|
||||
if mx_status.success == 0:
|
||||
raise MxAccessError(
|
||||
_mxaccess_message(operation, reply),
|
||||
protocol_status=status,
|
||||
raw_reply=reply,
|
||||
)
|
||||
|
||||
return reply
|
||||
|
||||
|
||||
def _mxaccess_message(operation: str, reply: pb.MxCommandReply) -> str:
|
||||
status_text = reply.protocol_status.message or "MXAccess command failed"
|
||||
hresult = reply.hresult if reply.HasField("hresult") else None
|
||||
return (
|
||||
f"{operation} failed: {status_text}; "
|
||||
f"session={reply.session_id}; correlation={reply.correlation_id}; "
|
||||
f"hresult={hresult}; statuses={len(reply.statuses)}"
|
||||
)
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Client connection options for the async Python wrapper."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import grpc
|
||||
|
||||
from .auth import REDACTED, ApiKey
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ClientOptions:
|
||||
"""Connection settings for `GatewayClient.connect`."""
|
||||
|
||||
endpoint: str
|
||||
api_key: str | ApiKey | None = None
|
||||
plaintext: bool = False
|
||||
ca_file: str | None = None
|
||||
server_name_override: str | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.endpoint:
|
||||
raise ValueError("endpoint must not be empty")
|
||||
|
||||
if self.plaintext and self.ca_file:
|
||||
raise ValueError("ca_file cannot be used with plaintext connections")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
api_key = REDACTED if self.api_key else None
|
||||
return (
|
||||
f"{type(self).__name__}(endpoint={self.endpoint!r}, "
|
||||
f"api_key={api_key!r}, plaintext={self.plaintext!r}, "
|
||||
f"ca_file={self.ca_file!r}, "
|
||||
f"server_name_override={self.server_name_override!r})"
|
||||
)
|
||||
|
||||
|
||||
def create_channel(options: ClientOptions) -> grpc.aio.Channel:
|
||||
"""Create a plaintext or TLS `grpc.aio` channel from client options."""
|
||||
|
||||
channel_options: list[tuple[str, str]] = []
|
||||
if options.server_name_override:
|
||||
channel_options.append(("grpc.ssl_target_name_override", options.server_name_override))
|
||||
|
||||
if options.plaintext:
|
||||
return grpc.aio.insecure_channel(options.endpoint, options=channel_options)
|
||||
|
||||
root_certificates = None
|
||||
if options.ca_file:
|
||||
root_certificates = Path(options.ca_file).read_bytes()
|
||||
|
||||
credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
|
||||
return grpc.aio.secure_channel(
|
||||
options.endpoint,
|
||||
credentials,
|
||||
options=channel_options,
|
||||
)
|
||||
@@ -0,0 +1,209 @@
|
||||
"""Async session wrapper for MXAccess Gateway commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
from .errors import ensure_mxaccess_success
|
||||
from .generated import mxaccess_gateway_pb2 as pb
|
||||
from .values import MxValueInput, to_mx_value
|
||||
|
||||
|
||||
class Session:
|
||||
"""A single gateway-backed MXAccess session."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
client: "GatewayClient",
|
||||
session_id: str,
|
||||
open_reply: pb.OpenSessionReply | None = None,
|
||||
) -> None:
|
||||
self.client = client
|
||||
self.session_id = session_id
|
||||
self.open_reply = open_reply
|
||||
self._closed = False
|
||||
|
||||
async def __aenter__(self) -> "Session":
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_exc_info: object) -> None:
|
||||
await self.close()
|
||||
|
||||
async def close(self, *, client_correlation_id: str = "") -> pb.CloseSessionReply:
|
||||
"""Close the gateway session. Repeated calls return a local closed reply."""
|
||||
|
||||
if self._closed:
|
||||
return pb.CloseSessionReply(
|
||||
session_id=self.session_id,
|
||||
final_state=pb.SESSION_STATE_CLOSED,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
)
|
||||
|
||||
self._closed = True
|
||||
return await self.client.close_session_raw(
|
||||
pb.CloseSessionRequest(
|
||||
session_id=self.session_id,
|
||||
client_correlation_id=client_correlation_id,
|
||||
),
|
||||
)
|
||||
|
||||
async def invoke(self, command: pb.MxCommand, *, correlation_id: str = "") -> pb.MxCommandReply:
|
||||
"""Invoke a raw command and enforce gateway and MXAccess success."""
|
||||
|
||||
reply = await self.invoke_raw(command, correlation_id=correlation_id)
|
||||
return ensure_mxaccess_success("invoke", reply)
|
||||
|
||||
async def invoke_raw(
|
||||
self,
|
||||
command: pb.MxCommand,
|
||||
*,
|
||||
correlation_id: str = "",
|
||||
) -> pb.MxCommandReply:
|
||||
"""Invoke a raw command and preserve the raw reply."""
|
||||
|
||||
return await self.client.invoke_raw(
|
||||
pb.MxCommandRequest(
|
||||
session_id=self.session_id,
|
||||
client_correlation_id=correlation_id,
|
||||
command=command,
|
||||
),
|
||||
)
|
||||
|
||||
async def register(self, client_name: str, *, correlation_id: str = "") -> int:
|
||||
reply = await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_REGISTER,
|
||||
register=pb.RegisterCommand(client_name=client_name),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
return reply.register.server_handle
|
||||
|
||||
async def unregister(self, server_handle: int, *, correlation_id: str = "") -> None:
|
||||
await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_UNREGISTER,
|
||||
unregister=pb.UnregisterCommand(server_handle=server_handle),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
|
||||
async def add_item(
|
||||
self,
|
||||
server_handle: int,
|
||||
item_definition: str,
|
||||
*,
|
||||
correlation_id: str = "",
|
||||
) -> int:
|
||||
reply = await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
|
||||
add_item=pb.AddItemCommand(
|
||||
server_handle=server_handle,
|
||||
item_definition=item_definition,
|
||||
),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
return reply.add_item.item_handle
|
||||
|
||||
async def add_item2(
|
||||
self,
|
||||
server_handle: int,
|
||||
item_definition: str,
|
||||
item_context: str,
|
||||
*,
|
||||
correlation_id: str = "",
|
||||
) -> int:
|
||||
reply = await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_ADD_ITEM2,
|
||||
add_item2=pb.AddItem2Command(
|
||||
server_handle=server_handle,
|
||||
item_definition=item_definition,
|
||||
item_context=item_context,
|
||||
),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
return reply.add_item2.item_handle
|
||||
|
||||
async def advise(
|
||||
self,
|
||||
server_handle: int,
|
||||
item_handle: int,
|
||||
*,
|
||||
correlation_id: str = "",
|
||||
) -> None:
|
||||
await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_ADVISE,
|
||||
advise=pb.AdviseCommand(
|
||||
server_handle=server_handle,
|
||||
item_handle=item_handle,
|
||||
),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
|
||||
async def write(
|
||||
self,
|
||||
server_handle: int,
|
||||
item_handle: int,
|
||||
value: MxValueInput,
|
||||
*,
|
||||
user_id: int = 0,
|
||||
correlation_id: str = "",
|
||||
) -> None:
|
||||
await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_WRITE,
|
||||
write=pb.WriteCommand(
|
||||
server_handle=server_handle,
|
||||
item_handle=item_handle,
|
||||
value=to_mx_value(value),
|
||||
user_id=user_id,
|
||||
),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
|
||||
async def write2(
|
||||
self,
|
||||
server_handle: int,
|
||||
item_handle: int,
|
||||
value: MxValueInput,
|
||||
timestamp_value: MxValueInput,
|
||||
*,
|
||||
user_id: int = 0,
|
||||
correlation_id: str = "",
|
||||
) -> None:
|
||||
await self.invoke(
|
||||
pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_WRITE2,
|
||||
write2=pb.Write2Command(
|
||||
server_handle=server_handle,
|
||||
item_handle=item_handle,
|
||||
value=to_mx_value(value),
|
||||
timestamp_value=to_mx_value(timestamp_value),
|
||||
user_id=user_id,
|
||||
),
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
|
||||
def stream_events(
|
||||
self,
|
||||
*,
|
||||
after_worker_sequence: int = 0,
|
||||
) -> AsyncIterator[pb.MxEvent]:
|
||||
return self.client.stream_events_raw(
|
||||
pb.StreamEventsRequest(
|
||||
session_id=self.session_id,
|
||||
after_worker_sequence=after_worker_sequence,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
from .client import GatewayClient # noqa: E402
|
||||
@@ -0,0 +1,234 @@
|
||||
"""MXAccess value conversion helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from google.protobuf.timestamp_pb2 import Timestamp
|
||||
|
||||
from .generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
|
||||
MxValueInput = bool | int | float | str | datetime | bytes | None | Sequence[Any]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MxValueView:
|
||||
"""Typed projection of a raw `MxValue` protobuf message."""
|
||||
|
||||
value: Any
|
||||
kind: str
|
||||
raw: pb.MxValue
|
||||
|
||||
|
||||
def to_mx_value(value: MxValueInput, *, data_type: str | None = None) -> pb.MxValue:
|
||||
"""Convert a Python value into the public protobuf `MxValue` union."""
|
||||
|
||||
if isinstance(value, pb.MxValue):
|
||||
return value
|
||||
|
||||
if value is None:
|
||||
return pb.MxValue(
|
||||
data_type=pb.MX_DATA_TYPE_NO_DATA,
|
||||
variant_type="VT_EMPTY",
|
||||
is_null=True,
|
||||
raw_data_type=pb.MX_DATA_TYPE_NO_DATA,
|
||||
)
|
||||
|
||||
if isinstance(value, bool):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_BOOLEAN),
|
||||
variant_type="VT_BOOL",
|
||||
bool_value=value,
|
||||
)
|
||||
|
||||
if isinstance(value, int):
|
||||
if -(2**31) <= value <= (2**31 - 1):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
|
||||
variant_type="VT_I4",
|
||||
int32_value=value,
|
||||
)
|
||||
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_INTEGER),
|
||||
variant_type="VT_I8",
|
||||
int64_value=value,
|
||||
)
|
||||
|
||||
if isinstance(value, float):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_DOUBLE),
|
||||
variant_type="VT_R8",
|
||||
double_value=value,
|
||||
)
|
||||
|
||||
if isinstance(value, str):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_STRING),
|
||||
variant_type="VT_BSTR",
|
||||
string_value=value,
|
||||
)
|
||||
|
||||
if isinstance(value, datetime):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_TIME),
|
||||
variant_type="VT_DATE",
|
||||
timestamp_value=_timestamp_from_datetime(value),
|
||||
)
|
||||
|
||||
if isinstance(value, bytes):
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
|
||||
variant_type="VT_RECORD",
|
||||
raw_value=value,
|
||||
)
|
||||
|
||||
if isinstance(value, Sequence):
|
||||
return _sequence_to_mx_value(value, data_type=data_type)
|
||||
|
||||
raise TypeError(f"unsupported MxValue input type: {type(value).__name__}")
|
||||
|
||||
|
||||
def from_mx_value(value: pb.MxValue) -> MxValueView:
|
||||
"""Project a protobuf `MxValue` into an idiomatic Python value."""
|
||||
|
||||
kind = value.WhichOneof("kind")
|
||||
if kind is None:
|
||||
return MxValueView(None, "none", value)
|
||||
|
||||
if kind == "timestamp_value":
|
||||
return MxValueView(
|
||||
value.timestamp_value.ToDatetime().replace(tzinfo=timezone.utc),
|
||||
kind,
|
||||
value,
|
||||
)
|
||||
|
||||
if kind == "array_value":
|
||||
return MxValueView(from_mx_array(value.array_value), kind, value)
|
||||
|
||||
return MxValueView(getattr(value, kind), kind, value)
|
||||
|
||||
|
||||
def from_mx_array(array: pb.MxArray) -> list[Any]:
|
||||
"""Project a protobuf `MxArray` into a Python list."""
|
||||
|
||||
kind = array.WhichOneof("values")
|
||||
if kind is None:
|
||||
return []
|
||||
|
||||
values = list(getattr(array, kind).values)
|
||||
if kind == "timestamp_values":
|
||||
return [
|
||||
timestamp.ToDatetime().replace(tzinfo=timezone.utc)
|
||||
for timestamp in values
|
||||
]
|
||||
|
||||
return values
|
||||
|
||||
|
||||
def _sequence_to_mx_value(
|
||||
values: Sequence[Any],
|
||||
*,
|
||||
data_type: str | None,
|
||||
) -> pb.MxValue:
|
||||
sequence = list(values)
|
||||
if not sequence:
|
||||
return pb.MxValue(
|
||||
data_type=_data_type(data_type, pb.MX_DATA_TYPE_UNKNOWN),
|
||||
array_value=pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
|
||||
dimensions=[0],
|
||||
),
|
||||
)
|
||||
|
||||
first = sequence[0]
|
||||
dimensions = [len(sequence)]
|
||||
|
||||
if all(isinstance(item, bool) for item in sequence):
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_BOOLEAN,
|
||||
variant_type="VT_ARRAY|VT_BOOL",
|
||||
dimensions=dimensions,
|
||||
bool_values=pb.BoolArray(values=sequence),
|
||||
)
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_BOOLEAN, array_value=array)
|
||||
|
||||
if all(isinstance(item, int) and not isinstance(item, bool) for item in sequence):
|
||||
use_int32 = all(-(2**31) <= item <= (2**31 - 1) for item in sequence)
|
||||
if use_int32:
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_INTEGER,
|
||||
variant_type="VT_ARRAY|VT_I4",
|
||||
dimensions=dimensions,
|
||||
int32_values=pb.Int32Array(values=sequence),
|
||||
)
|
||||
else:
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_INTEGER,
|
||||
variant_type="VT_ARRAY|VT_I8",
|
||||
dimensions=dimensions,
|
||||
int64_values=pb.Int64Array(values=sequence),
|
||||
)
|
||||
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_INTEGER, array_value=array)
|
||||
|
||||
if all(isinstance(item, float) for item in sequence):
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_DOUBLE,
|
||||
variant_type="VT_ARRAY|VT_R8",
|
||||
dimensions=dimensions,
|
||||
double_values=pb.DoubleArray(values=sequence),
|
||||
)
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_DOUBLE, array_value=array)
|
||||
|
||||
if all(isinstance(item, str) for item in sequence):
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_STRING,
|
||||
variant_type="VT_ARRAY|VT_BSTR",
|
||||
dimensions=dimensions,
|
||||
string_values=pb.StringArray(values=sequence),
|
||||
)
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_STRING, array_value=array)
|
||||
|
||||
if all(isinstance(item, datetime) for item in sequence):
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_TIME,
|
||||
variant_type="VT_ARRAY|VT_DATE",
|
||||
dimensions=dimensions,
|
||||
timestamp_values=pb.TimestampArray(
|
||||
values=[_timestamp_from_datetime(item) for item in sequence],
|
||||
),
|
||||
)
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_TIME, array_value=array)
|
||||
|
||||
if all(isinstance(item, bytes) for item in sequence):
|
||||
array = pb.MxArray(
|
||||
element_data_type=pb.MX_DATA_TYPE_UNKNOWN,
|
||||
variant_type="VT_ARRAY|VT_VARIANT",
|
||||
dimensions=dimensions,
|
||||
raw_values=pb.RawArray(values=sequence),
|
||||
)
|
||||
return pb.MxValue(data_type=pb.MX_DATA_TYPE_UNKNOWN, array_value=array)
|
||||
|
||||
raise TypeError(
|
||||
"MxValue array inputs must use one supported element type; "
|
||||
f"first element was {type(first).__name__}"
|
||||
)
|
||||
|
||||
|
||||
def _timestamp_from_datetime(value: datetime) -> Timestamp:
|
||||
timestamp = Timestamp()
|
||||
if value.tzinfo is None:
|
||||
value = value.replace(tzinfo=timezone.utc)
|
||||
timestamp.FromDatetime(value.astimezone(timezone.utc))
|
||||
return timestamp
|
||||
|
||||
|
||||
def _data_type(name: str | None, default: int) -> int:
|
||||
if name is None:
|
||||
return default
|
||||
return pb.MxDataType.Value(name)
|
||||
@@ -1,10 +1,24 @@
|
||||
"""CLI scaffold for the MXAccess Gateway Python client."""
|
||||
"""Command line interface for the MXAccess Gateway Python client."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
|
||||
from mxgateway import __version__
|
||||
from mxgateway.auth import redact_secret
|
||||
from mxgateway.client import GatewayClient
|
||||
from mxgateway.errors import MxGatewayError
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
from mxgateway.options import ClientOptions
|
||||
from mxgateway.values import MxValueInput
|
||||
|
||||
|
||||
@click.group()
|
||||
@@ -16,14 +30,435 @@ def main() -> None:
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def version(output_json: bool) -> None:
|
||||
"""Print client package version information."""
|
||||
|
||||
payload = {
|
||||
"client": "mxgw-py",
|
||||
"package": "mxaccess-gateway-client",
|
||||
"version": __version__,
|
||||
}
|
||||
_emit(payload, output_json=output_json, text=f"mxgw-py {__version__}")
|
||||
|
||||
|
||||
def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
|
||||
command = click.option("--endpoint", default="localhost:5000", show_default=True)(command)
|
||||
command = click.option("--api-key", default=None, help="Gateway API key.")(command)
|
||||
command = click.option(
|
||||
"--api-key-env",
|
||||
default=None,
|
||||
help="Environment variable containing the gateway API key.",
|
||||
)(command)
|
||||
command = click.option("--plaintext", is_flag=True, help="Use plaintext gRPC.")(command)
|
||||
command = click.option("--tls", "use_tls", is_flag=True, help="Use TLS gRPC.")(command)
|
||||
command = click.option("--ca-file", default=None, help="Custom root certificate file.")(command)
|
||||
command = click.option(
|
||||
"--server-name-override",
|
||||
default=None,
|
||||
help="TLS server name override for test environments.",
|
||||
)(command)
|
||||
return command
|
||||
|
||||
|
||||
@main.command("open-session")
|
||||
@gateway_options
|
||||
@click.option("--client-name", default="", help="Client session name.")
|
||||
@click.option("--requested-backend", default="", help="Requested backend name.")
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def open_session(**kwargs: Any) -> None:
|
||||
"""Open a gateway session."""
|
||||
|
||||
_run(
|
||||
_open_session(**kwargs),
|
||||
output_json=kwargs["output_json"],
|
||||
secrets=_secrets(kwargs),
|
||||
)
|
||||
|
||||
|
||||
@main.command("close-session")
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def close_session(**kwargs: Any) -> None:
|
||||
"""Close a gateway session."""
|
||||
|
||||
_run(
|
||||
_close_session(**kwargs),
|
||||
output_json=kwargs["output_json"],
|
||||
secrets=_secrets(kwargs),
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--message", default="ping", show_default=True, help="Ping payload.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def ping(**kwargs: Any) -> None:
|
||||
"""Send a diagnostic ping command."""
|
||||
|
||||
_run(_ping(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--client-name", required=True, help="MXAccess client name.")
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def register(**kwargs: Any) -> None:
|
||||
"""Invoke MXAccess Register."""
|
||||
|
||||
_run(
|
||||
_register(**kwargs),
|
||||
output_json=kwargs["output_json"],
|
||||
secrets=_secrets(kwargs),
|
||||
)
|
||||
|
||||
|
||||
@main.command("add-item")
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||
@click.option("--item", required=True, help="MXAccess item definition.")
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def add_item(**kwargs: Any) -> None:
|
||||
"""Invoke MXAccess AddItem."""
|
||||
|
||||
_run(
|
||||
_add_item(**kwargs),
|
||||
output_json=kwargs["output_json"],
|
||||
secrets=_secrets(kwargs),
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def advise(**kwargs: Any) -> None:
|
||||
"""Invoke MXAccess Advise."""
|
||||
|
||||
_run(_advise(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||
|
||||
|
||||
@main.command("stream-events")
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--after-worker-sequence", default=0, type=int, show_default=True)
|
||||
@click.option("--max-events", default=1, type=int, show_default=True)
|
||||
@click.option("--timeout", default=5.0, type=float, show_default=True)
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def stream_events(**kwargs: Any) -> None:
|
||||
"""Stream a bounded number of events."""
|
||||
|
||||
_run(
|
||||
_stream_events(**kwargs),
|
||||
output_json=kwargs["output_json"],
|
||||
secrets=_secrets(kwargs),
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
|
||||
@click.option("--type", "value_type", default="string", show_default=True)
|
||||
@click.option("--value", required=True, help="Value to write.")
|
||||
@click.option("--user-id", default=0, type=int, show_default=True)
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def write(**kwargs: Any) -> None:
|
||||
"""Invoke MXAccess Write."""
|
||||
|
||||
_run(_write(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||
@click.option("--server-handle", required=True, type=int, help="MXAccess server handle.")
|
||||
@click.option("--item-handle", required=True, type=int, help="MXAccess item handle.")
|
||||
@click.option("--type", "value_type", default="string", show_default=True)
|
||||
@click.option("--value", required=True, help="Value to write.")
|
||||
@click.option("--timestamp", required=True, help="ISO-8601 timestamp value.")
|
||||
@click.option("--user-id", default=0, type=int, show_default=True)
|
||||
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def write2(**kwargs: Any) -> None:
|
||||
"""Invoke MXAccess Write2."""
|
||||
|
||||
_run(_write2(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||
|
||||
|
||||
@main.command()
|
||||
@gateway_options
|
||||
@click.option("--client-name", default="mxgw-py-smoke", show_default=True)
|
||||
@click.option("--item", required=True, help="MXAccess item definition.")
|
||||
@click.option("--max-events", default=1, type=int, show_default=True)
|
||||
@click.option("--timeout", default=5.0, type=float, show_default=True)
|
||||
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||
def smoke(**kwargs: Any) -> None:
|
||||
"""Run a bounded open/register/add/advise/stream/close smoke flow."""
|
||||
|
||||
_run(_smoke(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs))
|
||||
|
||||
|
||||
async def _open_session(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
reply = await client.open_session_raw(
|
||||
pb.OpenSessionRequest(
|
||||
requested_backend=kwargs["requested_backend"],
|
||||
client_session_name=kwargs["client_name"],
|
||||
client_correlation_id=kwargs["correlation_id"],
|
||||
),
|
||||
)
|
||||
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
|
||||
|
||||
|
||||
async def _close_session(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
reply = await client.close_session_raw(
|
||||
pb.CloseSessionRequest(
|
||||
session_id=kwargs["session_id"],
|
||||
client_correlation_id=kwargs["correlation_id"],
|
||||
),
|
||||
)
|
||||
return {"sessionId": reply.session_id, "rawReply": _message_dict(reply)}
|
||||
|
||||
|
||||
async def _ping(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
reply = await client.invoke_raw(
|
||||
pb.MxCommandRequest(
|
||||
session_id=kwargs["session_id"],
|
||||
command=pb.MxCommand(
|
||||
kind=pb.MX_COMMAND_KIND_PING,
|
||||
ping=pb.PingCommand(message=kwargs["message"]),
|
||||
),
|
||||
),
|
||||
)
|
||||
return {"kind": "ping", "rawReply": _message_dict(reply)}
|
||||
|
||||
|
||||
async def _register(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
server_handle = await session.register(
|
||||
kwargs["client_name"],
|
||||
correlation_id=kwargs["correlation_id"],
|
||||
)
|
||||
return {"serverHandle": server_handle}
|
||||
|
||||
|
||||
async def _add_item(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
item_handle = await session.add_item(
|
||||
kwargs["server_handle"],
|
||||
kwargs["item"],
|
||||
correlation_id=kwargs["correlation_id"],
|
||||
)
|
||||
return {"itemHandle": item_handle}
|
||||
|
||||
|
||||
async def _advise(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
await session.advise(
|
||||
kwargs["server_handle"],
|
||||
kwargs["item_handle"],
|
||||
correlation_id=kwargs["correlation_id"],
|
||||
)
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
async def _stream_events(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
events = await _collect_events(
|
||||
session.stream_events(after_worker_sequence=kwargs["after_worker_sequence"]),
|
||||
max_events=kwargs["max_events"],
|
||||
timeout=kwargs["timeout"],
|
||||
)
|
||||
return {"events": [_message_dict(event) for event in events]}
|
||||
|
||||
|
||||
async def _write(**kwargs: Any) -> dict[str, Any]:
|
||||
value = _parse_value(kwargs["value"], kwargs["value_type"])
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
await session.write(
|
||||
kwargs["server_handle"],
|
||||
kwargs["item_handle"],
|
||||
value,
|
||||
user_id=kwargs["user_id"],
|
||||
correlation_id=kwargs["correlation_id"],
|
||||
)
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
async def _write2(**kwargs: Any) -> dict[str, Any]:
|
||||
value = _parse_value(kwargs["value"], kwargs["value_type"])
|
||||
timestamp = _parse_datetime(kwargs["timestamp"])
|
||||
async with await _connect(kwargs) as client:
|
||||
session = _session(client, kwargs["session_id"])
|
||||
await session.write2(
|
||||
kwargs["server_handle"],
|
||||
kwargs["item_handle"],
|
||||
value,
|
||||
timestamp,
|
||||
user_id=kwargs["user_id"],
|
||||
correlation_id=kwargs["correlation_id"],
|
||||
)
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
async def _smoke(**kwargs: Any) -> dict[str, Any]:
|
||||
async with await _connect(kwargs) as client:
|
||||
session = await client.open_session(client_session_name=kwargs["client_name"])
|
||||
closed = False
|
||||
try:
|
||||
server_handle = await session.register(kwargs["client_name"])
|
||||
item_handle = await session.add_item(server_handle, kwargs["item"])
|
||||
await session.advise(server_handle, item_handle)
|
||||
events = await _collect_events(
|
||||
session.stream_events(),
|
||||
max_events=kwargs["max_events"],
|
||||
timeout=kwargs["timeout"],
|
||||
)
|
||||
return {
|
||||
"sessionId": session.session_id,
|
||||
"serverHandle": server_handle,
|
||||
"itemHandle": item_handle,
|
||||
"events": [_message_dict(event) for event in events],
|
||||
}
|
||||
finally:
|
||||
if not closed:
|
||||
await session.close()
|
||||
|
||||
|
||||
async def _connect(kwargs: dict[str, Any]) -> GatewayClient:
|
||||
api_key = kwargs.get("api_key") or _api_key_from_env(kwargs.get("api_key_env"))
|
||||
return await GatewayClient.connect(
|
||||
ClientOptions(
|
||||
endpoint=kwargs["endpoint"],
|
||||
api_key=api_key,
|
||||
plaintext=_use_plaintext(kwargs),
|
||||
ca_file=kwargs.get("ca_file"),
|
||||
server_name_override=kwargs.get("server_name_override"),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _session(client: GatewayClient, session_id: str):
|
||||
from mxgateway.session import Session
|
||||
|
||||
return Session(client=client, session_id=session_id)
|
||||
|
||||
|
||||
def _use_plaintext(kwargs: dict[str, Any]) -> bool:
|
||||
if kwargs.get("use_tls"):
|
||||
return False
|
||||
if kwargs.get("plaintext"):
|
||||
return True
|
||||
return kwargs["endpoint"].startswith("localhost:") or kwargs["endpoint"].startswith("127.0.0.1:")
|
||||
|
||||
|
||||
def _api_key_from_env(name: str | None) -> str | None:
|
||||
if not name:
|
||||
return None
|
||||
return os.environ.get(name)
|
||||
|
||||
|
||||
def _secrets(kwargs: dict[str, Any]) -> list[str | None]:
|
||||
return [
|
||||
kwargs.get("api_key"),
|
||||
_api_key_from_env(kwargs.get("api_key_env")),
|
||||
]
|
||||
|
||||
|
||||
def _run(
|
||||
awaitable: Awaitable[dict[str, Any]],
|
||||
*,
|
||||
output_json: bool,
|
||||
secrets: list[str | None],
|
||||
) -> None:
|
||||
try:
|
||||
payload = asyncio.run(awaitable)
|
||||
except MxGatewayError as error:
|
||||
raise click.ClickException(redact_secret(str(error), secrets)) from error
|
||||
|
||||
_emit(payload, output_json=output_json)
|
||||
|
||||
|
||||
def _emit(
|
||||
payload: dict[str, Any],
|
||||
*,
|
||||
output_json: bool,
|
||||
text: str | None = None,
|
||||
) -> None:
|
||||
if output_json:
|
||||
click.echo(json.dumps(payload, sort_keys=True))
|
||||
return
|
||||
|
||||
click.echo(f"mxgw-py {__version__}")
|
||||
click.echo(text or json.dumps(payload, sort_keys=True))
|
||||
|
||||
|
||||
async def _collect_events(
|
||||
events: Any,
|
||||
*,
|
||||
max_events: int,
|
||||
timeout: float,
|
||||
) -> list[pb.MxEvent]:
|
||||
collected: list[pb.MxEvent] = []
|
||||
iterator = events.__aiter__()
|
||||
try:
|
||||
while len(collected) < max_events:
|
||||
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
|
||||
except StopAsyncIteration:
|
||||
pass
|
||||
finally:
|
||||
close = getattr(iterator, "aclose", None)
|
||||
if close is not None:
|
||||
await close()
|
||||
return collected
|
||||
|
||||
|
||||
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
|
||||
normalized = value_type.lower()
|
||||
if normalized == "bool":
|
||||
return raw_value.lower() in ("1", "true", "yes", "on")
|
||||
if normalized in ("int", "int32", "int64"):
|
||||
return int(raw_value)
|
||||
if normalized in ("float", "double"):
|
||||
return float(raw_value)
|
||||
if normalized in ("time", "timestamp"):
|
||||
return _parse_datetime(raw_value)
|
||||
if normalized == "raw":
|
||||
return raw_value.encode("utf-8")
|
||||
if normalized == "string":
|
||||
return raw_value
|
||||
raise click.BadParameter(f"unsupported value type: {value_type}", param_hint="--type")
|
||||
|
||||
|
||||
def _parse_datetime(raw_value: str) -> datetime:
|
||||
if raw_value.endswith("Z"):
|
||||
raw_value = raw_value[:-1] + "+00:00"
|
||||
parsed = datetime.fromisoformat(raw_value)
|
||||
if parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=timezone.utc)
|
||||
return parsed
|
||||
|
||||
|
||||
def _message_dict(message: Any) -> dict[str, Any]:
|
||||
return MessageToDict(
|
||||
message,
|
||||
preserving_proto_field_name=False,
|
||||
use_integers_for_enums=False,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
"""Tests for auth metadata and connection options."""
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway.auth import REDACTED, ApiKey, auth_metadata, redact_secret
|
||||
from mxgateway import options as options_module
|
||||
from mxgateway.options import ClientOptions, create_channel
|
||||
|
||||
|
||||
def test_auth_metadata_adds_bearer_api_key() -> None:
|
||||
assert auth_metadata("mxgw_test_secret") == (
|
||||
("authorization", "Bearer mxgw_test_secret"),
|
||||
)
|
||||
|
||||
|
||||
def test_api_key_repr_is_redacted() -> None:
|
||||
api_key = ApiKey("mxgw_test_secret")
|
||||
|
||||
assert "mxgw_test_secret" not in repr(api_key)
|
||||
assert REDACTED in repr(api_key)
|
||||
|
||||
|
||||
def test_redact_secret_replaces_known_values() -> None:
|
||||
redacted = redact_secret(
|
||||
"authorization failed for mxgw_test_secret",
|
||||
["mxgw_test_secret"],
|
||||
)
|
||||
|
||||
assert redacted == f"authorization failed for {REDACTED}"
|
||||
|
||||
|
||||
def test_client_options_reject_plaintext_with_ca_file() -> None:
|
||||
with pytest.raises(ValueError, match="ca_file"):
|
||||
ClientOptions(
|
||||
endpoint="localhost:5000",
|
||||
plaintext=True,
|
||||
ca_file="ca.pem",
|
||||
)
|
||||
|
||||
|
||||
def test_client_options_repr_redacts_api_key() -> None:
|
||||
options = ClientOptions(endpoint="localhost:5000", api_key="mxgw_test_secret")
|
||||
|
||||
assert "mxgw_test_secret" not in repr(options)
|
||||
assert REDACTED in repr(options)
|
||||
|
||||
|
||||
def test_create_channel_uses_plaintext_channel(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
calls: list[tuple[str, object]] = []
|
||||
|
||||
def fake_insecure_channel(endpoint: str, *, options: object) -> str:
|
||||
calls.append((endpoint, options))
|
||||
return "plain-channel"
|
||||
|
||||
monkeypatch.setattr(
|
||||
options_module.grpc.aio,
|
||||
"insecure_channel",
|
||||
fake_insecure_channel,
|
||||
)
|
||||
|
||||
channel = create_channel(ClientOptions(endpoint="localhost:5000", plaintext=True))
|
||||
|
||||
assert channel == "plain-channel"
|
||||
assert calls == [("localhost:5000", [])]
|
||||
|
||||
|
||||
def test_create_channel_uses_tls_channel(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
calls: list[tuple[str, object, object]] = []
|
||||
|
||||
def fake_credentials(*, root_certificates: object) -> str:
|
||||
assert root_certificates is None
|
||||
return "creds"
|
||||
|
||||
def fake_secure_channel(endpoint: str, credentials: object, *, options: object) -> str:
|
||||
calls.append((endpoint, credentials, options))
|
||||
return "tls-channel"
|
||||
|
||||
monkeypatch.setattr(
|
||||
options_module.grpc,
|
||||
"ssl_channel_credentials",
|
||||
fake_credentials,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
options_module.grpc.aio,
|
||||
"secure_channel",
|
||||
fake_secure_channel,
|
||||
)
|
||||
|
||||
channel = create_channel(
|
||||
ClientOptions(
|
||||
endpoint="gateway.example:5001",
|
||||
server_name_override="gateway.test",
|
||||
),
|
||||
)
|
||||
|
||||
assert channel == "tls-channel"
|
||||
assert calls == [
|
||||
(
|
||||
"gateway.example:5001",
|
||||
"creds",
|
||||
[("grpc.ssl_target_name_override", "gateway.test")],
|
||||
),
|
||||
]
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Tests for the Python CLI scaffold."""
|
||||
"""Tests for the Python CLI."""
|
||||
|
||||
import json
|
||||
|
||||
@@ -19,3 +19,50 @@ def test_version_json_is_deterministic() -> None:
|
||||
"package": "mxaccess-gateway-client",
|
||||
"version": __version__,
|
||||
}
|
||||
|
||||
|
||||
def test_write_parser_rejects_unknown_value_type() -> None:
|
||||
runner = CliRunner()
|
||||
|
||||
result = runner.invoke(
|
||||
main,
|
||||
[
|
||||
"write",
|
||||
"--session-id",
|
||||
"session-1",
|
||||
"--server-handle",
|
||||
"12",
|
||||
"--item-handle",
|
||||
"34",
|
||||
"--type",
|
||||
"unsupported",
|
||||
"--value",
|
||||
"123",
|
||||
"--api-key",
|
||||
"mxgw_test_secret",
|
||||
"--json",
|
||||
],
|
||||
)
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "unsupported value type" in result.output
|
||||
|
||||
|
||||
def test_cli_error_output_redacts_api_key() -> None:
|
||||
runner = CliRunner()
|
||||
|
||||
result = runner.invoke(
|
||||
main,
|
||||
[
|
||||
"open-session",
|
||||
"--endpoint",
|
||||
"127.0.0.1:1",
|
||||
"--api-key",
|
||||
"mxgw_test_secret",
|
||||
"--plaintext",
|
||||
"--json",
|
||||
],
|
||||
)
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "mxgw_test_secret" not in result.output
|
||||
|
||||
@@ -0,0 +1,225 @@
|
||||
"""Tests for the async client and session wrappers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway import ClientOptions, GatewayClient, MxAccessError
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_helpers_send_auth_metadata_and_preserve_raw_replies() -> None:
|
||||
stub = FakeGatewayStub()
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
session = await client.open_session(client_session_name="pytest")
|
||||
server_handle = await session.register("pytest-client")
|
||||
item_handle = await session.add_item(server_handle, "Object.Attribute")
|
||||
await session.advise(server_handle, item_handle)
|
||||
|
||||
assert session.session_id == "session-1"
|
||||
assert server_handle == 12
|
||||
assert item_handle == 34
|
||||
assert stub.open_session.metadata == (("authorization", "Bearer mxgw_test_secret"),)
|
||||
assert stub.invoke.requests[0].command.register.client_name == "pytest-client"
|
||||
assert stub.invoke.requests[1].command.add_item.item_definition == "Object.Attribute"
|
||||
assert stub.invoke.requests[2].command.advise.item_handle == 34
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mxaccess_error_preserves_raw_reply() -> None:
|
||||
stub = FakeGatewayStub()
|
||||
failure_reply = pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=pb.MX_COMMAND_KIND_WRITE,
|
||||
protocol_status=pb.ProtocolStatus(
|
||||
code=pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE,
|
||||
message="MXAccess rejected write.",
|
||||
),
|
||||
hresult=-1,
|
||||
)
|
||||
stub.invoke.replies = [failure_reply]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
with pytest.raises(MxAccessError) as captured:
|
||||
await session.write(12, 34, 123)
|
||||
|
||||
assert captured.value.raw_reply is failure_reply
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_events_cancels_underlying_call_when_closed() -> None:
|
||||
stream = FakeStream(
|
||||
[
|
||||
pb.MxEvent(
|
||||
session_id="session-1",
|
||||
worker_sequence=1,
|
||||
family=pb.MX_EVENT_FAMILY_ON_DATA_CHANGE,
|
||||
),
|
||||
],
|
||||
)
|
||||
stub = FakeGatewayStub(stream=stream)
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
events = session.stream_events()
|
||||
first = await anext(events)
|
||||
await events.aclose()
|
||||
|
||||
assert first.worker_sequence == 1
|
||||
assert stream.cancelled
|
||||
assert stub.stream_metadata == (("authorization", "Bearer mxgw_test_secret"),)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unary_task_cancellation_reaches_fake_call() -> None:
|
||||
blocking = BlockingCancellableUnary()
|
||||
stub = FakeGatewayStub()
|
||||
stub.OpenSession = blocking
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
task = asyncio.create_task(client.open_session())
|
||||
await blocking.started.wait()
|
||||
task.cancel()
|
||||
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
assert blocking.call is not None
|
||||
assert blocking.call.cancelled
|
||||
|
||||
|
||||
class FakeGatewayStub:
|
||||
def __init__(self, stream: "FakeStream | None" = None) -> None:
|
||||
self.open_session = FakeUnary(
|
||||
[
|
||||
pb.OpenSessionReply(
|
||||
session_id="session-1",
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
),
|
||||
],
|
||||
)
|
||||
self.close_session = FakeUnary(
|
||||
[
|
||||
pb.CloseSessionReply(
|
||||
session_id="session-1",
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
),
|
||||
],
|
||||
)
|
||||
self.invoke = FakeUnary(
|
||||
[
|
||||
pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=pb.MX_COMMAND_KIND_REGISTER,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
register=pb.RegisterReply(server_handle=12),
|
||||
),
|
||||
pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=pb.MX_COMMAND_KIND_ADD_ITEM,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
add_item=pb.AddItemReply(item_handle=34),
|
||||
),
|
||||
pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=pb.MX_COMMAND_KIND_ADVISE,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
),
|
||||
],
|
||||
)
|
||||
self.OpenSession = self.open_session
|
||||
self.CloseSession = self.close_session
|
||||
self.Invoke = self.invoke
|
||||
self._stream = stream or FakeStream([])
|
||||
self.stream_metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
def StreamEvents(
|
||||
self,
|
||||
request: pb.StreamEventsRequest,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> "FakeStream":
|
||||
self.stream_request = request
|
||||
self.stream_metadata = metadata
|
||||
return self._stream
|
||||
|
||||
|
||||
class FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = replies
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
class BlockingCancellableUnary:
|
||||
def __init__(self) -> None:
|
||||
self.started = asyncio.Event()
|
||||
self.call: BlockingCall | None = None
|
||||
|
||||
def __call__(self, *_args: Any, **_kwargs: Any) -> "BlockingCall":
|
||||
self.call = BlockingCall(self.started)
|
||||
return self.call
|
||||
|
||||
|
||||
class BlockingCall:
|
||||
def __init__(self, started: asyncio.Event) -> None:
|
||||
self.started = started
|
||||
self.cancelled = False
|
||||
|
||||
def __await__(self):
|
||||
return self._wait().__await__()
|
||||
|
||||
async def _wait(self) -> Any:
|
||||
self.started.set()
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
|
||||
def cancel(self) -> None:
|
||||
self.cancelled = True
|
||||
|
||||
|
||||
class FakeStream:
|
||||
def __init__(self, events: list[pb.MxEvent]) -> None:
|
||||
self._events = events
|
||||
self.cancelled = False
|
||||
|
||||
def __aiter__(self) -> "FakeStream":
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> pb.MxEvent:
|
||||
if not self._events:
|
||||
await asyncio.sleep(3600)
|
||||
return self._events.pop(0)
|
||||
|
||||
def cancel(self) -> None:
|
||||
self.cancelled = True
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Tests for typed command error mapping."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from google.protobuf.json_format import ParseDict
|
||||
|
||||
from mxgateway.errors import ensure_mxaccess_success, ensure_protocol_success
|
||||
from mxgateway import MxAccessError, MxGatewaySessionError
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
|
||||
|
||||
|
||||
def test_register_fixture_is_protocol_and_mxaccess_success() -> None:
|
||||
reply = _load_reply("command-replies/register.ok.reply.json")
|
||||
|
||||
assert ensure_protocol_success("register", reply.protocol_status, reply) is reply
|
||||
assert ensure_mxaccess_success("register", reply) is reply
|
||||
|
||||
|
||||
def test_write_failure_fixture_preserves_raw_reply() -> None:
|
||||
reply = _load_reply("command-replies/write.mxaccess-failure.reply.json")
|
||||
|
||||
assert ensure_protocol_success("write", reply.protocol_status, reply) is reply
|
||||
with pytest.raises(MxAccessError) as captured:
|
||||
ensure_mxaccess_success("write", reply)
|
||||
|
||||
assert captured.value.raw_reply is reply
|
||||
assert captured.value.raw_reply.hresult == -2147220992
|
||||
assert len(captured.value.raw_reply.statuses) == 2
|
||||
|
||||
|
||||
def test_session_status_maps_to_session_error() -> None:
|
||||
status = pb.ProtocolStatus(
|
||||
code=pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND,
|
||||
message="session missing",
|
||||
)
|
||||
|
||||
with pytest.raises(MxGatewaySessionError) as captured:
|
||||
ensure_protocol_success("invoke", status)
|
||||
|
||||
assert captured.value.protocol_status is status
|
||||
|
||||
|
||||
def _load_reply(name: str) -> pb.MxCommandReply:
|
||||
payload = json.loads((FIXTURE_ROOT / name).read_text(encoding="utf-8"))
|
||||
return ParseDict(payload, pb.MxCommandReply())
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Tests for MXAccess value conversion helpers."""
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from google.protobuf.json_format import ParseDict
|
||||
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
from mxgateway.values import from_mx_value, to_mx_value
|
||||
|
||||
FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "proto" / "fixtures" / "behavior"
|
||||
|
||||
|
||||
def test_value_conversion_fixtures_project_expected_oneof_kind() -> None:
|
||||
payload = json.loads(
|
||||
(FIXTURE_ROOT / "values" / "value-conversion-cases.json").read_text(
|
||||
encoding="utf-8",
|
||||
),
|
||||
)
|
||||
|
||||
for case in payload["cases"]:
|
||||
value = ParseDict(case["value"], pb.MxValue())
|
||||
projection = from_mx_value(value)
|
||||
|
||||
assert projection.kind == _snake_case(case["expectedKind"])
|
||||
assert projection.raw is value
|
||||
|
||||
|
||||
def test_to_mx_value_supports_scalar_and_array_inputs() -> None:
|
||||
assert to_mx_value(True).WhichOneof("kind") == "bool_value"
|
||||
assert to_mx_value(12).int32_value == 12
|
||||
assert to_mx_value(2**40).int64_value == 2**40
|
||||
assert to_mx_value(12.5).double_value == 12.5
|
||||
assert to_mx_value("abc").string_value == "abc"
|
||||
assert to_mx_value([1, 2]).array_value.int32_values.values == [1, 2]
|
||||
assert to_mx_value(["a", "b"]).array_value.string_values.values == ["a", "b"]
|
||||
|
||||
|
||||
def test_to_mx_value_uses_utc_timestamps() -> None:
|
||||
value = to_mx_value(datetime(2026, 1, 1, 0, 0, 4, tzinfo=timezone.utc))
|
||||
|
||||
assert value.data_type == pb.MX_DATA_TYPE_TIME
|
||||
assert value.timestamp_value.seconds == 1767225604
|
||||
|
||||
|
||||
def _snake_case(value: str) -> str:
|
||||
return re.sub(r"(?<!^)(?=[A-Z])", "_", value).lower()
|
||||
@@ -76,6 +76,13 @@ stdout/stderr lines emitted during the run.
|
||||
|
||||
## Focused Commands
|
||||
|
||||
Run the parity fixture matrix tests after changing the integration parity
|
||||
scenario list:
|
||||
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
|
||||
```
|
||||
|
||||
Run the fake worker tests after changing gateway worker IPC, session startup, or
|
||||
event streaming behavior:
|
||||
|
||||
@@ -95,6 +102,7 @@ dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Parity Fixture Matrix](./ParityFixtureMatrix.md)
|
||||
- [Gateway Process Design](./gateway-process-design.md)
|
||||
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
|
||||
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
# Parity Fixture Matrix
|
||||
|
||||
The parity fixture matrix defines the live-test scenarios used to compare
|
||||
direct MXAccess behavior with the gateway-backed worker. It is a planning and
|
||||
validation fixture, not a source of synthetic MXAccess behavior.
|
||||
|
||||
The matrix lives in
|
||||
`clients/proto/fixtures/parity/parity-fixture-matrix.json`. It references the
|
||||
local MXAccess capture set under
|
||||
`C:/Users/dohertj2/Desktop/mxaccess/captures` and keeps capture paths relative
|
||||
to that root so the repository does not copy raw capture artifacts.
|
||||
|
||||
## Scope
|
||||
|
||||
The matrix covers every public `LMXProxyServerClass` method represented by the
|
||||
gateway contract:
|
||||
|
||||
- `Register`
|
||||
- `Unregister`
|
||||
- `AddItem`
|
||||
- `AddItem2`
|
||||
- `RemoveItem`
|
||||
- `Advise`
|
||||
- `UnAdvise`
|
||||
- `AdviseSupervisory`
|
||||
- `AddBufferedItem`
|
||||
- `SetBufferedUpdateInterval`
|
||||
- `Suspend`
|
||||
- `Activate`
|
||||
- `Write`
|
||||
- `Write2`
|
||||
- `WriteSecured`
|
||||
- `WriteSecured2`
|
||||
- `AuthenticateUser`
|
||||
- `ArchestrAUserToId`
|
||||
|
||||
Each entry is either a `planned_fixture` or a `documented_gap`.
|
||||
`WriteSecured` remains a documented gap because the current captures show
|
||||
`0x80004021` before MXAccess emits a value-bearing write body.
|
||||
`OperationComplete` and public `OnBufferedDataChange` batches also remain
|
||||
documented gaps because no capture in the current set proves those public event
|
||||
payloads from native MXAccess.
|
||||
|
||||
## Required Scenario Groups
|
||||
|
||||
The matrix pins the high-risk parity scenarios from the integration milestone:
|
||||
|
||||
| Scenario | Purpose |
|
||||
|----------|---------|
|
||||
| `invalid_handles` | Preserves invalid server, item, post-remove, and invalid-reference HRESULT/status behavior. |
|
||||
| `write_statuses` | Compares successful writes, wrong-type writes, invalid references, arrays, and write-complete status arrays. |
|
||||
| `secured_writes` | Covers observed `WriteSecured` rejection and authenticated `WriteSecured2` paths without logging credential-bearing values. |
|
||||
| `add_item_context` | Ensures `AddItem2` and buffered registration pass context strings exactly as supplied. |
|
||||
| `buffered_registration` | Tracks buffered item registration and interval setup separately from normal advice. |
|
||||
|
||||
## Comparison Format
|
||||
|
||||
Each live parity fixture should record one direct MXAccess result and one
|
||||
gateway result for the same operation.
|
||||
|
||||
Direct MXAccess records include:
|
||||
|
||||
- method name,
|
||||
- arguments after redaction,
|
||||
- returned value,
|
||||
- HRESULT,
|
||||
- exception type,
|
||||
- `MXSTATUS_PROXY[]` values,
|
||||
- native event records in observed order.
|
||||
|
||||
Gateway records include:
|
||||
|
||||
- `MxCommandKind`,
|
||||
- `ProtocolStatus`,
|
||||
- `MxCommandReply.ReturnValue`,
|
||||
- `MxCommandReply.Hresult`,
|
||||
- repeated `MxCommandReply.Statuses`,
|
||||
- safe diagnostic message,
|
||||
- streamed `MxEvent` records in worker-sequence order.
|
||||
|
||||
Compare HRESULT, exception type, returned value, status array shape, raw status
|
||||
fields, event family order, event payload shape, value projection, and raw
|
||||
fallback metadata. The gateway must not convert an MXAccess command failure
|
||||
into a transport failure when the worker captured HRESULT or status details.
|
||||
|
||||
## Validation
|
||||
|
||||
Run the parity fixture matrix tests after changing the matrix:
|
||||
|
||||
```bash
|
||||
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~ParityFixtureMatrixTests
|
||||
```
|
||||
|
||||
Live MXAccess execution remains opt-in. The matrix defines which scenarios to
|
||||
run when the installed MXAccess COM component and provider state are available;
|
||||
normal unit tests only validate the repository fixture shape.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Gateway Testing](./GatewayTesting.md)
|
||||
- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md)
|
||||
- [Protobuf Contracts](./Contracts.md)
|
||||
@@ -137,9 +137,17 @@ The Python scaffold provides a repo-local generation script:
|
||||
clients/python/generate-proto.ps1
|
||||
```
|
||||
|
||||
Java clients should use the Gradle protobuf plugin and write generated sources
|
||||
under `clients/java/src/main/generated`. The Java client scaffold owns the
|
||||
Gradle plugin versions and source-set wiring.
|
||||
Java clients use the Gradle protobuf plugin from `clients/java`. The
|
||||
`mxgateway-client` project reads the shared `.proto` files and writes generated
|
||||
Java protobuf and gRPC sources under `clients/java/src/main/generated`, matching
|
||||
the manifest output path. Handwritten client and CLI code stays in the
|
||||
`mxgateway-client` and `mxgateway-cli` project source trees.
|
||||
|
||||
Run the Java workspace checks from `clients/java`:
|
||||
|
||||
```powershell
|
||||
gradle test
|
||||
```
|
||||
|
||||
## Golden Fixtures
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ Recommended Gradle multi-project layout:
|
||||
clients/java/
|
||||
settings.gradle
|
||||
build.gradle
|
||||
src/main/generated/
|
||||
mxgateway-client/
|
||||
build.gradle
|
||||
src/main/java/com/dohertylan/mxgateway/client/
|
||||
@@ -31,6 +32,7 @@ Alternative Maven layout is acceptable if the repo standardizes on Maven.
|
||||
Target Java:
|
||||
|
||||
- Java 21 recommended.
|
||||
- The Gradle scaffold uses the Java 21 toolchain for compilation and tests.
|
||||
|
||||
Expected dependencies:
|
||||
|
||||
@@ -189,3 +191,16 @@ Publish library and CLI separately:
|
||||
|
||||
Generated protobuf code should be produced during the build from shared proto
|
||||
files and should not be hand-edited.
|
||||
|
||||
## Current Build
|
||||
|
||||
Run the Java scaffold checks from `clients/java`:
|
||||
|
||||
```powershell
|
||||
gradle test
|
||||
```
|
||||
|
||||
The `mxgateway-client` project generates the gateway and worker protobuf/gRPC
|
||||
bindings into `src/main/generated`, compiles the generated contracts, and runs
|
||||
JUnit 5 tests. The `mxgateway-cli` project builds a Picocli-based `mxgw-java`
|
||||
entry point for later command implementation.
|
||||
|
||||
@@ -0,0 +1,293 @@
|
||||
using System.Text.Json;
|
||||
using MxGateway.Contracts;
|
||||
|
||||
namespace MxGateway.Tests.Contracts;
|
||||
|
||||
public sealed class ParityFixtureMatrixTests
|
||||
{
|
||||
[Fact]
|
||||
public void Matrix_DeclaresCurrentProtocolVersionsAndComparisonFields()
|
||||
{
|
||||
using JsonDocument matrix = LoadParityMatrix();
|
||||
JsonElement root = matrix.RootElement;
|
||||
|
||||
Assert.Equal(1, root.GetProperty("schemaVersion").GetInt32());
|
||||
Assert.Equal("mxaccess-gateway-parity-fixture-matrix", root.GetProperty("fixtureSet").GetString());
|
||||
Assert.Equal(GatewayContractInfo.GatewayProtocolVersion, root.GetProperty("gatewayProtocolVersion").GetUInt32());
|
||||
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, root.GetProperty("workerProtocolVersion").GetUInt32());
|
||||
|
||||
JsonElement comparisonFormat = root.GetProperty("comparisonFormat");
|
||||
AssertRequiredFields(
|
||||
comparisonFormat.GetProperty("directMxAccess").GetProperty("requiredFields"),
|
||||
"method",
|
||||
"arguments",
|
||||
"returnedValue",
|
||||
"hresult",
|
||||
"statuses",
|
||||
"events");
|
||||
AssertRequiredFields(
|
||||
comparisonFormat.GetProperty("gatewayResult").GetProperty("requiredFields"),
|
||||
"kind",
|
||||
"protocolStatus",
|
||||
"returnValue",
|
||||
"hresult",
|
||||
"statuses",
|
||||
"events");
|
||||
AssertRequiredFields(
|
||||
comparisonFormat.GetProperty("eventFields"),
|
||||
"family",
|
||||
"value",
|
||||
"quality",
|
||||
"sourceTimestamp",
|
||||
"statuses",
|
||||
"workerSequence");
|
||||
AssertRequiredFields(
|
||||
comparisonFormat.GetProperty("comparisonKeys"),
|
||||
"hresult",
|
||||
"statusArrayShape",
|
||||
"statusRawFields",
|
||||
"eventFamilyOrder",
|
||||
"eventPayloadShape",
|
||||
"valueProjection",
|
||||
"rawFallbackMetadata");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Matrix_CoversEveryPublicMxAccessMethod()
|
||||
{
|
||||
using JsonDocument matrix = LoadParityMatrix();
|
||||
JsonElement methodFixtures = matrix.RootElement.GetProperty("methodFixtures");
|
||||
|
||||
Dictionary<string, JsonElement> fixturesByMethod = [];
|
||||
HashSet<string> ids = new(StringComparer.Ordinal);
|
||||
|
||||
foreach (JsonElement fixture in methodFixtures.EnumerateArray())
|
||||
{
|
||||
string id = fixture.GetProperty("id").GetString()!;
|
||||
string method = fixture.GetProperty("method").GetString()!;
|
||||
string commandKind = fixture.GetProperty("commandKind").GetString()!;
|
||||
string status = fixture.GetProperty("status").GetString()!;
|
||||
|
||||
Assert.True(ids.Add(id), $"Duplicate parity fixture id '{id}'.");
|
||||
Assert.True(fixturesByMethod.TryAdd(method, fixture), $"Duplicate parity method '{method}'.");
|
||||
Assert.StartsWith("MX_COMMAND_KIND_", commandKind, StringComparison.Ordinal);
|
||||
Assert.Contains(status, KnownFixtureStatuses);
|
||||
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
|
||||
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
|
||||
}
|
||||
|
||||
Assert.Equal(ExpectedPublicMethods.Order(StringComparer.Ordinal), fixturesByMethod.Keys.Order(StringComparer.Ordinal));
|
||||
|
||||
foreach (string method in ExpectedPublicMethods)
|
||||
{
|
||||
JsonElement fixture = fixturesByMethod[method];
|
||||
string status = fixture.GetProperty("status").GetString()!;
|
||||
|
||||
Assert.True(
|
||||
status == "planned_fixture" || status == "documented_gap",
|
||||
$"Method '{method}' must have a planned parity fixture or documented gap.");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Matrix_CoversRequiredParityScenarioGroups()
|
||||
{
|
||||
using JsonDocument matrix = LoadParityMatrix();
|
||||
HashSet<string> knownFixtureIds = GetFixtureIds(matrix.RootElement);
|
||||
Dictionary<string, JsonElement> groupsById = [];
|
||||
|
||||
foreach (JsonElement group in matrix.RootElement.GetProperty("scenarioGroups").EnumerateArray())
|
||||
{
|
||||
string id = group.GetProperty("id").GetString()!;
|
||||
|
||||
Assert.True(groupsById.TryAdd(id, group), $"Duplicate parity scenario group '{id}'.");
|
||||
Assert.NotEmpty(group.GetProperty("description").GetString()!);
|
||||
Assert.NotEmpty(group.GetProperty("fixtureIds").EnumerateArray());
|
||||
AssertCaptureReferencesAreRelative(group.GetProperty("captureReferences"));
|
||||
|
||||
foreach (JsonElement fixtureIdElement in group.GetProperty("fixtureIds").EnumerateArray())
|
||||
{
|
||||
string fixtureId = fixtureIdElement.GetString()!;
|
||||
Assert.Contains(fixtureId, knownFixtureIds);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (string requiredGroup in RequiredScenarioGroups)
|
||||
{
|
||||
Assert.True(groupsById.ContainsKey(requiredGroup), $"Missing required parity scenario group '{requiredGroup}'.");
|
||||
}
|
||||
|
||||
AssertScenarioCovers(groupsById["invalid_handles"], "method.remove-item.basic", "method.write.value-status-matrix");
|
||||
AssertScenarioCovers(groupsById["write_statuses"], "method.write.value-status-matrix", "event.on-write-complete.status");
|
||||
AssertScenarioCovers(groupsById["secured_writes"], "method.write-secured.rejection-gap", "method.write-secured2.authenticated");
|
||||
AssertScenarioCovers(groupsById["add_item_context"], "method.add-item2.context", "method.add-buffered-item.context");
|
||||
AssertScenarioCovers(groupsById["buffered_registration"], "method.add-buffered-item.context", "event.on-buffered-data-change.batch-gap");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Matrix_CoversEveryPublicMxAccessEventFamily()
|
||||
{
|
||||
using JsonDocument matrix = LoadParityMatrix();
|
||||
Dictionary<string, JsonElement> fixturesByFamily = [];
|
||||
|
||||
foreach (JsonElement fixture in matrix.RootElement.GetProperty("eventFixtures").EnumerateArray())
|
||||
{
|
||||
string family = fixture.GetProperty("family").GetString()!;
|
||||
string status = fixture.GetProperty("status").GetString()!;
|
||||
|
||||
Assert.True(fixturesByFamily.TryAdd(family, fixture), $"Duplicate parity event family '{family}'.");
|
||||
Assert.Contains(status, KnownFixtureStatuses);
|
||||
Assert.NotEmpty(fixture.GetProperty("assertions").EnumerateArray());
|
||||
AssertCaptureReferencesAreRelative(fixture.GetProperty("captureReferences"));
|
||||
}
|
||||
|
||||
foreach (string eventFamily in ExpectedEventFamilies)
|
||||
{
|
||||
Assert.True(fixturesByFamily.ContainsKey(eventFamily), $"Missing parity fixture for event family '{eventFamily}'.");
|
||||
}
|
||||
|
||||
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_OPERATION_COMPLETE"].GetProperty("status").GetString());
|
||||
Assert.Equal("documented_gap", fixturesByFamily["MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE"].GetProperty("status").GetString());
|
||||
}
|
||||
|
||||
private static readonly string[] ExpectedPublicMethods =
|
||||
[
|
||||
"Register",
|
||||
"Unregister",
|
||||
"AddItem",
|
||||
"AddItem2",
|
||||
"RemoveItem",
|
||||
"Advise",
|
||||
"UnAdvise",
|
||||
"AdviseSupervisory",
|
||||
"AddBufferedItem",
|
||||
"SetBufferedUpdateInterval",
|
||||
"Suspend",
|
||||
"Activate",
|
||||
"Write",
|
||||
"Write2",
|
||||
"WriteSecured",
|
||||
"WriteSecured2",
|
||||
"AuthenticateUser",
|
||||
"ArchestrAUserToId",
|
||||
];
|
||||
|
||||
private static readonly string[] ExpectedEventFamilies =
|
||||
[
|
||||
"MX_EVENT_FAMILY_ON_DATA_CHANGE",
|
||||
"MX_EVENT_FAMILY_ON_WRITE_COMPLETE",
|
||||
"MX_EVENT_FAMILY_OPERATION_COMPLETE",
|
||||
"MX_EVENT_FAMILY_ON_BUFFERED_DATA_CHANGE",
|
||||
];
|
||||
|
||||
private static readonly string[] RequiredScenarioGroups =
|
||||
[
|
||||
"invalid_handles",
|
||||
"write_statuses",
|
||||
"secured_writes",
|
||||
"add_item_context",
|
||||
"buffered_registration",
|
||||
];
|
||||
|
||||
private static readonly string[] KnownFixtureStatuses =
|
||||
[
|
||||
"planned_fixture",
|
||||
"documented_gap",
|
||||
];
|
||||
|
||||
private static void AssertRequiredFields(
|
||||
JsonElement fields,
|
||||
params string[] expectedFields)
|
||||
{
|
||||
HashSet<string> declared = fields
|
||||
.EnumerateArray()
|
||||
.Select(field => field.GetString()!)
|
||||
.ToHashSet(StringComparer.Ordinal);
|
||||
|
||||
foreach (string expectedField in expectedFields)
|
||||
{
|
||||
Assert.Contains(expectedField, declared);
|
||||
}
|
||||
}
|
||||
|
||||
private static void AssertCaptureReferencesAreRelative(JsonElement captureReferences)
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
foreach (JsonElement captureReference in captureReferences.EnumerateArray())
|
||||
{
|
||||
string path = captureReference.GetString()!;
|
||||
|
||||
Assert.StartsWith("captures/", path, StringComparison.Ordinal);
|
||||
Assert.DoesNotContain("\\", path, StringComparison.Ordinal);
|
||||
Assert.False(Path.IsPathRooted(path), $"Capture reference '{path}' must be relative.");
|
||||
count++;
|
||||
}
|
||||
|
||||
Assert.True(count > 0, "Each parity fixture must reference at least one MXAccess capture.");
|
||||
}
|
||||
|
||||
private static void AssertScenarioCovers(
|
||||
JsonElement group,
|
||||
params string[] fixtureIds)
|
||||
{
|
||||
HashSet<string> declared = group
|
||||
.GetProperty("fixtureIds")
|
||||
.EnumerateArray()
|
||||
.Select(fixtureId => fixtureId.GetString()!)
|
||||
.ToHashSet(StringComparer.Ordinal);
|
||||
|
||||
foreach (string fixtureId in fixtureIds)
|
||||
{
|
||||
Assert.Contains(fixtureId, declared);
|
||||
}
|
||||
}
|
||||
|
||||
private static HashSet<string> GetFixtureIds(JsonElement root)
|
||||
{
|
||||
HashSet<string> ids = new(StringComparer.Ordinal);
|
||||
|
||||
foreach (JsonElement fixture in root.GetProperty("methodFixtures").EnumerateArray())
|
||||
{
|
||||
ids.Add(fixture.GetProperty("id").GetString()!);
|
||||
}
|
||||
|
||||
foreach (JsonElement fixture in root.GetProperty("eventFixtures").EnumerateArray())
|
||||
{
|
||||
ids.Add(fixture.GetProperty("id").GetString()!);
|
||||
}
|
||||
|
||||
return ids;
|
||||
}
|
||||
|
||||
private static JsonDocument LoadParityMatrix()
|
||||
{
|
||||
return JsonDocument.Parse(File.ReadAllText(Path.Combine(GetParityFixtureRoot().FullName, "parity-fixture-matrix.json")));
|
||||
}
|
||||
|
||||
private static DirectoryInfo GetParityFixtureRoot()
|
||||
{
|
||||
DirectoryInfo repositoryRoot = FindRepositoryRoot();
|
||||
|
||||
return new DirectoryInfo(Path.Combine(repositoryRoot.FullName, "clients", "proto", "fixtures", "parity"));
|
||||
}
|
||||
|
||||
private static DirectoryInfo FindRepositoryRoot()
|
||||
{
|
||||
DirectoryInfo? current = new(AppContext.BaseDirectory);
|
||||
|
||||
while (current is not null)
|
||||
{
|
||||
if (File.Exists(Path.Combine(current.FullName, "AGENTS.md"))
|
||||
&& Directory.Exists(Path.Combine(current.FullName, "src"))
|
||||
&& Directory.Exists(Path.Combine(current.FullName, "clients")))
|
||||
{
|
||||
return current;
|
||||
}
|
||||
|
||||
current = current.Parent;
|
||||
}
|
||||
|
||||
throw new DirectoryNotFoundException("Could not locate the repository root from the test output directory.");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user