e2e: drive each client CLI through one long-lived batch process

The cross-language e2e matrix spawned one CLI process per operation —
~250 per client — paying a process (and, for the Java CLI, a full JVM)
cold-start every time. The Java leg alone ran ~16 minutes.

Each client CLI (dotnet, go, rust, python, java) gains a `batch`
subcommand: a single process that reads one command line from stdin,
runs it through the normal subcommand dispatch, writes the JSON result,
then a line containing exactly `__MXGW_BATCH_EOR__`. A failing command
writes its `{"error":...}` envelope and the loop continues.

run-client-e2e-tests.ps1 now launches one batch process per client and
pings every operation through its stdin/stdout, so startup is paid once
per client. The orchestration and assertions are unchanged; the parity
and auth phases now read the `{"error":...}` envelope instead of a
process exit code.

Full 5-client matrix with -VerifyWrite: ~15 min, down from ~35; the Java
leg dropped from ~16 min to ~2-3.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 06:20:13 -04:00
parent c1ff8c94e8
commit 6126099cdb
10 changed files with 970 additions and 47 deletions
@@ -25,7 +25,7 @@ public static class MxGatewayClientCli
TextWriter standardOutput,
TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null)
.GetAwaiter()
.GetResult();
}
@@ -35,11 +35,13 @@ public static class MxGatewayClientCli
/// <param name="standardOutput">TextWriter for command output.</param>
/// <param name="standardError">TextWriter for error messages.</param>
/// <param name="clientFactory">Optional factory to create the gateway client; defaults to MxGatewayClient.Create.</param>
/// <param name="standardInput">Optional TextReader for batch-mode stdin; defaults to <see cref="Console.In"/>.</param>
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null,
TextReader? standardInput = null)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
@@ -49,14 +51,19 @@ public static class MxGatewayClientCli
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
clientFactory ?? CreateDefaultClient,
standardInput ?? Console.In);
}
private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__";
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
TextReader standardInput,
bool forceJsonErrors = false)
{
if (args.Length is 0 || IsHelp(args[0]))
{
@@ -65,6 +72,12 @@ public static class MxGatewayClientCli
}
string command = args[0].ToLowerInvariant();
if (command is "batch")
{
return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false);
}
CliArguments arguments = new(args.Skip(1));
try
@@ -142,7 +155,7 @@ public static class MxGatewayClientCli
string? apiKey = TryResolveApiKey(arguments);
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
if (forceJsonErrors || arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
@@ -239,6 +252,82 @@ public static class MxGatewayClientCli
return cancellation;
}
/// <summary>
/// Runs the CLI in batch mode: reads one command line at a time from
/// <paramref name="standardInput"/>, dispatches it through the normal
/// routing, writes all output to <paramref name="standardOutput"/>, and
/// then appends <see cref="BatchEndOfRecord"/> as a sentinel so the
/// caller can delimit command results. Continues on failure; errors are
/// written as JSON to <paramref name="standardOutput"/> (not stderr) so
/// that the harness sees them inside the same delimited block. Exits 0
/// on EOF or empty line.
/// </summary>
private static async Task<int> RunBatchAsync(
TextWriter standardOutput,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
TextReader standardInput)
{
while (true)
{
string? line = await standardInput.ReadLineAsync().ConfigureAwait(false);
// EOF or empty line signals clean exit.
if (line is null || line.Length is 0)
{
return 0;
}
// Split on runs of ASCII whitespace — no quoting support by design.
string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
// Per-command output is buffered so we can redirect errors to stdout.
using StringWriter commandOutput = new();
// Errors in batch mode go to stdout (same delimited block), formatted as JSON.
// We use a capturing error writer and re-emit through commandOutput after the
// command returns, so the EOR sentinel always follows the complete result.
using StringWriter commandError = new();
try
{
await RunCoreAsync(
lineArgs,
commandOutput,
commandError,
clientFactory,
standardInput,
forceJsonErrors: true)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
commandError.WriteLine(JsonSerializer.Serialize(
new { error = exception.Message, type = exception.GetType().Name },
JsonOptions));
}
// Write any buffered normal output first.
string commandOutputText = commandOutput.ToString();
if (commandOutputText.Length > 0)
{
standardOutput.Write(commandOutputText);
}
// Then any error output — in batch mode it belongs on stdout so the harness
// sees it inside the delimited record.
string commandErrorText = commandError.ToString();
if (commandErrorText.Length > 0)
{
standardOutput.Write(commandErrorText);
}
// Write the end-of-record sentinel and flush so the harness can unblock.
standardOutput.WriteLine(BatchEndOfRecord);
await standardOutput.FlushAsync().ConfigureAwait(false);
}
}
private static Task<int> OpenSessionAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -1861,6 +1950,7 @@ public static class MxGatewayClientCli
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)");
writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
@@ -468,6 +468,141 @@ public sealed class MxGatewayClientCliTests
Assert.Contains("\"objectCount\": 99", text);
}
/// <summary>Verifies that batch mode executes a single no-gateway command and writes the EOR sentinel.</summary>
[Fact]
public async Task RunAsync_Batch_SingleVersionCommand_WritesOutputAndEorSentinel()
{
using var output = new StringWriter();
using var error = new StringWriter();
using var stdin = new StringReader("version --json\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: stdin);
Assert.Equal(0, exitCode);
string text = output.ToString();
Assert.Contains("\"gatewayProtocolVersion\"", text);
Assert.Contains("__MXGW_BATCH_EOR__", text);
// Sentinel must appear after the output, not before.
int outputIdx = text.IndexOf("gatewayProtocolVersion", StringComparison.Ordinal);
int eorIdx = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
Assert.True(outputIdx < eorIdx, "EOR sentinel must follow command output.");
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that batch mode processes two commands sequentially and writes two EOR sentinels.</summary>
[Fact]
public async Task RunAsync_Batch_TwoVersionCommands_WritesTwoEorSentinels()
{
using var output = new StringWriter();
using var error = new StringWriter();
// Two commands followed by EOF (end of string).
using var stdin = new StringReader("version\nversion --json\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: stdin);
Assert.Equal(0, exitCode);
string text = output.ToString();
int firstEor = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
int secondEor = text.IndexOf("__MXGW_BATCH_EOR__", firstEor + 1, StringComparison.Ordinal);
Assert.True(firstEor >= 0, "First EOR sentinel must be present.");
Assert.True(secondEor > firstEor, "Second EOR sentinel must follow first.");
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that batch mode on EOF (empty stdin) exits 0 immediately without writing any sentinel.</summary>
[Fact]
public async Task RunAsync_Batch_EmptyStdin_ExitsZeroWithNoOutput()
{
using var output = new StringWriter();
using var error = new StringWriter();
using var stdin = new StringReader(string.Empty);
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: stdin);
Assert.Equal(0, exitCode);
Assert.Equal(string.Empty, output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>
/// Verifies that batch mode continues after a command failure and writes the error JSON
/// to stdout (not stderr), followed by the EOR sentinel.
/// </summary>
[Fact]
public async Task RunAsync_Batch_CommandFailure_WritesErrorJsonToStdoutAndContinues()
{
using var output = new StringWriter();
using var error = new StringWriter();
// First line: a gateway command with no API key (will fail).
// Second line: version (will succeed).
using var stdin = new StringReader("open-session --endpoint http://localhost:5000\nversion --json\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: _ => throw new InvalidOperationException("injected failure"),
standardInput: stdin);
Assert.Equal(0, exitCode);
string text = output.ToString();
// Error record: the error JSON must be on stdout, not stderr.
Assert.Contains("\"error\"", text);
Assert.Equal(string.Empty, error.ToString());
// Both records must be present.
int firstEor = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
int secondEor = text.IndexOf("__MXGW_BATCH_EOR__", firstEor + 1, StringComparison.Ordinal);
Assert.True(firstEor >= 0, "EOR after failed command must be present.");
Assert.True(secondEor > firstEor, "EOR after successful command must follow first EOR.");
// Second record must contain the version output.
string afterFirstEor = text[(firstEor + "__MXGW_BATCH_EOR__".Length)..];
Assert.Contains("\"gatewayProtocolVersion\"", afterFirstEor);
}
/// <summary>Verifies that batch mode treats an empty (blank) line as EOF and exits 0.</summary>
[Fact]
public async Task RunAsync_Batch_EmptyLine_ExitsZeroAfterPreviousCommands()
{
using var output = new StringWriter();
using var error = new StringWriter();
// One command, then an empty line (stop signal), then another command that must NOT run.
using var stdin = new StringReader("version --json\n\nversion --json\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: stdin);
Assert.Equal(0, exitCode);
string text = output.ToString();
// Only one EOR sentinel — the second command after the empty line must not execute.
int firstEor = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
int secondEor = text.IndexOf("__MXGW_BATCH_EOR__", firstEor + 1, StringComparison.Ordinal);
Assert.True(firstEor >= 0, "One EOR sentinel must be present.");
Assert.Equal(-1, secondEor);
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Fake CLI client for testing.</summary>
private sealed class FakeCliClient : IMxGatewayCliClient
{
+40 -1
View File
@@ -6,6 +6,7 @@
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
@@ -116,6 +117,8 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
return runGalaxyDiscover(ctx, args[1:], stdout, stderr)
case "galaxy-watch":
return runGalaxyWatch(ctx, args[1:], stdout, stderr)
case "batch":
return runBatch(ctx, os.Stdin, stdout, stderr)
default:
writeUsage(stderr)
return fmt.Errorf("unknown command %q", args[0])
@@ -1080,8 +1083,44 @@ type protojsonMessage interface {
ProtoReflect() protoreflect.Message
}
// batchEOR is the end-of-result sentinel emitted to stdout after every command
// in batch mode, regardless of success or failure.
const batchEOR = "__MXGW_BATCH_EOR__"
// runBatch reads one command line at a time from in, dispatches each via the
// normal runWithIO routing, and writes a batchEOR sentinel to stdout after
// every result. Errors are serialised as JSON to stdout (not stderr) so the
// harness can parse them without interleaving stderr. The loop never terminates
// on command error; only stdin EOF (or an empty line) ends the session.
func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error {
bw := bufio.NewWriter(stdout)
scanner := bufio.NewScanner(in)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
break
}
args := strings.Fields(line)
if len(args) == 0 {
continue
}
if err := runWithIO(ctx, args, bw, stderr); err != nil {
// Write error as JSON to stdout (bw) so the harness sees it in the
// same stream as normal output, framed by the EOR sentinel.
errPayload := map[string]string{
"error": err.Error(),
"type": "error",
}
_ = writeJSON(bw, errPayload)
}
_, _ = fmt.Fprintln(bw, batchEOR)
_ = bw.Flush()
}
return scanner.Err()
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
@@ -14,7 +14,11 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyAttribute;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
@@ -128,9 +132,90 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
commandLine.addSubcommand("galaxy-discover", new GalaxyDiscoverCommand());
commandLine.addSubcommand("galaxy-watch", new GalaxyWatchCommand());
commandLine.addSubcommand("batch", new BatchCommand(clientFactory));
return commandLine;
}
/** Sentinel written to stdout after every command result in batch mode. */
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
* every result. Errors are written as JSON to stdout so the harness
* sees them in the same stream, delimited by the same sentinel. The
* loop never terminates on command failure; only stdin EOF (or an
* empty line) ends the session.
*/
@Command(name = "batch", description = "Reads CLI invocations from stdin and executes them sequentially.")
static final class BatchCommand implements Callable<Integer> {
private final MxGatewayCliClientFactory clientFactory;
@Spec
private CommandSpec spec;
BatchCommand(MxGatewayCliClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public Integer call() {
PrintWriter out = spec.commandLine().getOut();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(System.in, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) {
break;
}
String[] args = line.trim().split("\\s+");
if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) {
continue;
}
StringWriter cmdOut = new StringWriter();
StringWriter cmdErr = new StringWriter();
PrintWriter cmdOutWriter = new PrintWriter(cmdOut, true);
PrintWriter cmdErrWriter = new PrintWriter(cmdErr, true);
try {
CommandLine cmd = commandLine(clientFactory);
cmd.setOut(cmdOutWriter);
cmd.setErr(cmdErrWriter);
int exitCode = cmd.execute(args);
cmdOutWriter.flush();
cmdErrWriter.flush();
String cmdOutput = cmdOut.toString();
if (!cmdOutput.isEmpty()) {
out.print(cmdOutput);
}
if (exitCode != 0) {
// Non-zero exit: emit the stderr content (if any) as a JSON
// error object to stdout so the harness can parse it in the
// same delimited stream.
String errText = cmdErr.toString().trim();
if (errText.isEmpty()) {
errText = "command exited with code " + exitCode;
}
Map<String, Object> errorPayload = new LinkedHashMap<>();
errorPayload.put("error", errText);
errorPayload.put("type", "error");
out.println(jsonObject(errorPayload));
}
} catch (Exception ex) {
Map<String, Object> errorPayload = new LinkedHashMap<>();
errorPayload.put("error", ex.getMessage() != null ? ex.getMessage() : ex.getClass().getName());
errorPayload.put("type", "error");
out.println(jsonObject(errorPayload));
}
out.println(BATCH_EOR);
out.flush();
}
} catch (java.io.IOException ex) {
// Stdin closed unexpectedly — treat as EOF and exit normally.
}
return 0;
}
}
abstract static class GalaxyCommand implements Callable<Integer> {
@Mixin
CommonOptions common = new CommonOptions();
@@ -4,8 +4,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
@@ -386,6 +389,90 @@ final class MxGatewayCliTests {
assertTrue(output.contains("TestMachine_002.TestChangingInt"), output);
}
// ---- Client.Java-027: batch subcommand ----
@Test
void batchCommandExecutesTwoCommandsAndEmitsEorAfterEach() {
String stdin = "version --json\nversion --json\n";
CliRun run = executeBatch(new FakeClientFactory(), stdin);
assertEquals(0, run.exitCode());
String out = run.output();
// Two EOR sentinels — one per input line.
int firstEor = out.indexOf(MxGatewayCli.BATCH_EOR);
int lastEor = out.lastIndexOf(MxGatewayCli.BATCH_EOR);
assertTrue(firstEor >= 0, "expected at least one EOR sentinel");
assertTrue(lastEor > firstEor, "expected two distinct EOR sentinels");
// Both results contain version JSON.
assertTrue(out.contains("\"clientVersion\""), out);
}
@Test
void batchCommandEmitsEorOnFailedCommand() {
// "open-session" without --endpoint / --api-key-env will fail against
// the FakeClientFactory (missing required option --session-id for
// close-session, for example). Use an unknown subcommand to provoke a
// picocli parse error which produces a non-zero exit code without
// hitting the gateway.
String stdin = "no-such-subcommand\nversion --json\n";
CliRun run = executeBatch(new FakeClientFactory(), stdin);
assertEquals(0, run.exitCode());
String out = run.output();
// Two EOR sentinels even though the first command failed.
int firstEor = out.indexOf(MxGatewayCli.BATCH_EOR);
int lastEor = out.lastIndexOf(MxGatewayCli.BATCH_EOR);
assertTrue(firstEor >= 0, "expected EOR after failed command");
assertTrue(lastEor > firstEor, "expected EOR after second (successful) command");
// The second command's result is present.
assertTrue(out.contains("\"clientVersion\""), out);
}
@Test
void batchCommandExitsZeroOnEmptyLine() {
// An empty line signals EOF-equivalent; loop exits immediately.
CliRun run = executeBatch(new FakeClientFactory(), "\n");
assertEquals(0, run.exitCode());
}
@Test
void batchCommandExitsZeroOnActualEof() {
CliRun run = executeBatch(new FakeClientFactory(), "");
assertEquals(0, run.exitCode());
}
@Test
void batchCommandDoesNotTerminateAfterFailedCommand() {
// Three lines: good, bad, good — all three EORs must appear and the
// third command must produce its output.
String stdin = "version --json\nno-such-subcommand\nversion --json\n";
CliRun run = executeBatch(new FakeClientFactory(), stdin);
assertEquals(0, run.exitCode());
String out = run.output();
long eorCount = out.lines()
.filter(l -> l.equals(MxGatewayCli.BATCH_EOR))
.count();
assertEquals(3, eorCount, "expected exactly 3 EOR sentinels, got: " + eorCount + "\nOutput:\n" + out);
}
/**
* Runs the CLI with {@code batch} as the subcommand, using the provided
* string as standard input content. Temporarily replaces {@link System#in}
* for the duration of the call.
*/
private static CliRun executeBatch(MxGatewayCli.MxGatewayCliClientFactory factory, String stdinContent) {
InputStream originalIn = System.in;
try {
System.setIn(new ByteArrayInputStream(stdinContent.getBytes(StandardCharsets.UTF_8)));
return execute(factory, "batch");
} finally {
System.setIn(originalIn);
}
}
private static CliRun execute(MxGatewayCli.MxGatewayCliClientFactory factory, String... args) {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
@@ -5,11 +5,13 @@ from __future__ import annotations
import asyncio
import json
import os
import sys
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import Any
import click
from click.testing import CliRunner
from google.protobuf.json_format import MessageToDict
from mxgateway import __version__
@@ -23,6 +25,8 @@ from mxgateway.values import MxValueInput, to_mx_value
MAX_AGGREGATE_EVENTS = 10_000
_BATCH_EOR = "__MXGW_BATCH_EOR__"
@click.group()
def main() -> None:
@@ -42,6 +46,80 @@ def version(output_json: bool) -> None:
_emit(payload, output_json=output_json, text=f"mxgw-py {__version__}")
@main.command()
def batch() -> None:
"""Read commands from stdin and execute each, writing output + __MXGW_BATCH_EOR__ after each.
Each non-empty line of stdin is a complete argument string (no quoting support — the
harness never passes whitespace-containing arguments). Lines are split on runs of ASCII
whitespace and dispatched through the normal CLI parser. On EOF or an empty line, exit 0.
Errors do NOT terminate the loop. Each command's output (including any error JSON) is
written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then
stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``.
"""
runner = CliRunner()
for raw_line in sys.stdin:
line = raw_line.rstrip("\n").rstrip("\r")
if not line:
# Empty line signals clean exit (matches the spec and .NET behaviour).
break
args = line.split()
try:
result = runner.invoke(main, args, catch_exceptions=True)
except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die
_batch_write_error(exc.__class__.__name__, str(exc))
_batch_flush_eor()
continue
if result.exit_code == 0:
# Normal success — write captured output as-is.
sys.stdout.write(result.output)
else:
# Something went wrong. If the command already emitted a JSON object
# (e.g. the output starts with '{'), trust that and relay it verbatim.
# Otherwise synthesise the standard {"error": ..., "type": ...} shape.
output = result.output or ""
exc = result.exception
if output.lstrip().startswith("{"):
# Already JSON — relay verbatim (may or may not end with newline).
sys.stdout.write(output)
if not output.endswith("\n"):
sys.stdout.write("\n")
elif exc is not None and not isinstance(exc, SystemExit):
_batch_write_error(type(exc).__name__, str(exc))
else:
# Click's default error format is "Error: <message>\n"; extract the
# message so the harness gets clean JSON.
msg = output.strip()
if msg.startswith("Error: "):
msg = msg[len("Error: "):]
exc_type = (
type(exc).__name__
if exc is not None and not isinstance(exc, SystemExit)
else "CliError"
)
_batch_write_error(exc_type, msg)
_batch_flush_eor()
def _batch_write_error(exc_type: str, message: str) -> None:
"""Write a JSON error record to stdout in the standard batch error shape."""
sys.stdout.write(json.dumps({"error": message, "type": exc_type}) + "\n")
def _batch_flush_eor() -> None:
"""Write the end-of-record sentinel and flush stdout."""
sys.stdout.write(_BATCH_EOR + "\n")
sys.stdout.flush()
def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
"""Apply the shared gateway connection options to a Click command."""
command = click.option("--endpoint", default="localhost:5000", show_default=True)(command)
+142 -1
View File
@@ -1,13 +1,15 @@
"""Tests for the Python CLI."""
import io
import json
from typing import Any
import click
import pytest
from click.testing import CliRunner
from mxgateway import __version__
from mxgateway_cli.commands import _use_plaintext, main
from mxgateway_cli.commands import _BATCH_EOR, _use_plaintext, main
def test_version_json_is_deterministic() -> None:
@@ -216,3 +218,142 @@ def test_cli_localhost_endpoint_with_plaintext_flag_uses_plaintext(
assert result.exit_code != 0
assert captured.get("plaintext") is True
# ---------------------------------------------------------------------------
# batch subcommand tests
# ---------------------------------------------------------------------------
def _run_batch(lines: list[str]) -> tuple[int, list[str]]:
"""Invoke ``batch`` with the given stdin lines; return (exit_code, stdout_lines)."""
runner = CliRunner()
stdin_text = "\n".join(lines) + "\n"
result = runner.invoke(main, ["batch"], input=stdin_text)
stdout_lines = result.output.splitlines()
return result.exit_code, stdout_lines
def _split_records(stdout_lines: list[str]) -> list[list[str]]:
"""Split stdout lines on ``__MXGW_BATCH_EOR__`` sentinels into per-command records."""
records: list[list[str]] = []
current: list[str] = []
for line in stdout_lines:
if line == _BATCH_EOR:
records.append(current)
current = []
else:
current.append(line)
# Any trailing lines without a sentinel are ignored (shouldn't occur).
return records
def test_batch_version_json_produces_eor_sentinel() -> None:
"""A single ``version --json`` line produces the version JSON followed by the EOR sentinel."""
exit_code, lines = _run_batch(["version --json"])
assert exit_code == 0
records = _split_records(lines)
assert len(records) == 1
payload = json.loads(records[0][0])
assert payload == {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
def test_batch_two_commands_produce_two_delimited_records() -> None:
"""Two input lines produce exactly two EOR-delimited records."""
exit_code, lines = _run_batch(["version --json", "version --json"])
assert exit_code == 0
records = _split_records(lines)
assert len(records) == 2
for record in records:
payload = json.loads(record[0])
assert payload["client"] == "mxgw-py"
def test_batch_eof_exits_zero() -> None:
"""EOF on stdin exits with code 0."""
runner = CliRunner()
result = runner.invoke(main, ["batch"], input="")
assert result.exit_code == 0
def test_batch_empty_line_exits_zero() -> None:
"""An empty line signals a clean exit with code 0."""
exit_code, lines = _run_batch([""])
assert exit_code == 0
# No EOR sentinels should have been emitted.
assert _BATCH_EOR not in lines
def test_batch_empty_line_stops_processing_subsequent_commands() -> None:
"""Commands after the first empty line must not be executed."""
exit_code, lines = _run_batch(["", "version --json"])
assert exit_code == 0
# No records should appear because the empty line stopped the loop.
records = _split_records(lines)
assert records == []
def test_batch_failure_does_not_terminate_loop() -> None:
"""A failing command (bad parse) must not terminate the batch loop."""
exit_code, lines = _run_batch([
"open-session --unknown-flag",
"version --json",
])
assert exit_code == 0
records = _split_records(lines)
# Two records: one error + one success.
assert len(records) == 2
# First record must be a JSON error object.
error_payload = json.loads(records[0][0])
assert "error" in error_payload
assert "type" in error_payload
# Second record must be the version JSON.
version_payload = json.loads(records[1][0])
assert version_payload["client"] == "mxgw-py"
def test_batch_error_record_has_required_json_shape() -> None:
"""A failing command must produce ``{"error": "...", "type": "..."}`` JSON."""
exit_code, lines = _run_batch(["open-session --unknown-flag"])
assert exit_code == 0
records = _split_records(lines)
assert len(records) == 1
payload = json.loads(records[0][0])
assert isinstance(payload.get("error"), str)
assert isinstance(payload.get("type"), str)
def test_batch_network_error_produces_error_json_not_terminates(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A network-level failure (MxGatewayError) on one command must not stop the loop."""
async def _fake_connect(kwargs: dict[str, Any]) -> Any:
raise RuntimeError("injected-network-failure")
monkeypatch.setattr("mxgateway_cli.commands.GatewayClient.connect", _fake_connect)
exit_code, lines = _run_batch([
"open-session --endpoint localhost:5000 --api-key mxgw_test --plaintext --json",
"version --json",
])
assert exit_code == 0
records = _split_records(lines)
assert len(records) == 2
# First record is an error.
error_payload = json.loads(records[0][0])
assert "error" in error_payload
assert "type" in error_payload
# Second record is success.
version_payload = json.loads(records[1][0])
assert version_payload["client"] == "mxgw-py"
+94 -3
View File
@@ -9,6 +9,7 @@
#![warn(missing_docs)]
use std::env;
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
@@ -319,6 +320,13 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Read commands from stdin, one per line, execute each in sequence, and
/// write `__MXGW_BATCH_EOR__` to stdout after every result. Errors are
/// written as `{"error":"…","type":"error"}` JSON to stdout (not stderr)
/// so the harness can parse them without interleaving stderr. The loop
/// never terminates on command error; only stdin EOF (or an empty line)
/// ends the session.
Batch,
#[command(subcommand)]
Galaxy(GalaxyCommand),
}
@@ -427,7 +435,11 @@ enum CliValueType {
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
match run(cli).await {
let result = match cli.command {
Command::Batch => run_batch().await,
command => dispatch(command).await,
};
match result {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
eprintln!("{error}");
@@ -436,8 +448,17 @@ async fn main() -> ExitCode {
}
}
async fn run(cli: Cli) -> Result<(), Error> {
match cli.command {
/// Dispatch a parsed [`Command`] to its handler. All subcommands except
/// [`Command::Batch`] are handled here; `Batch` is handled separately in
/// `main` to avoid mutual recursion between `dispatch` and `run_batch`.
async fn dispatch(command: Command) -> Result<(), Error> {
match command {
Command::Batch => {
return Err(Error::InvalidArgument {
name: "batch".to_owned(),
detail: "batch cannot be nested inside another batch session".to_owned(),
});
}
Command::Version { json, .. } => print_version(json),
Command::Ping {
connection,
@@ -996,6 +1017,76 @@ async fn session_for(
Ok(client.session(session_id))
}
/// End-of-result sentinel written to stdout after every batch command.
const BATCH_EOR: &str = "__MXGW_BATCH_EOR__";
/// Run the batch loop: read one command line at a time from stdin, dispatch
/// each through the normal [`run`] path, and write [`BATCH_EOR`] to stdout
/// after every result. Errors are serialised as JSON to stdout so the
/// harness can parse them without interleaving stderr. The loop never
/// terminates on command error; only stdin EOF or an empty line ends the
/// session.
async fn run_batch() -> Result<(), Error> {
let stdin = io::stdin();
let stdout = io::stdout();
for line in stdin.lock().lines() {
let line = line.map_err(|e| Error::InvalidArgument {
name: "stdin".to_owned(),
detail: e.to_string(),
})?;
if line.is_empty() {
break;
}
let parts: Vec<&str> = line.split_ascii_whitespace().collect();
if parts.is_empty() {
println!("{BATCH_EOR}");
stdout.lock().flush().ok();
continue;
}
// Re-parse the split arguments under a fresh Cli, prepending the
// program-name placeholder so clap sees a complete argv[].
let parse_result =
Cli::try_parse_from(std::iter::once("mxgw-cli").chain(parts.iter().copied()));
let outcome: Result<(), Error> = match parse_result {
Ok(cli) => {
// Spawn on a new tokio task so each command runs with a fresh
// stack, avoiding stack overflow from the large dispatch future.
tokio::task::spawn(dispatch(cli.command))
.await
.unwrap_or_else(|join_err| {
Err(Error::InvalidArgument {
name: "task".to_owned(),
detail: join_err.to_string(),
})
})
}
Err(clap_err) => Err(Error::InvalidArgument {
name: "args".to_owned(),
detail: clap_err.to_string(),
}),
};
if let Err(err) = outcome {
// Write error as JSON to stdout so the harness sees it in the
// same stream as normal output, framed by the EOR sentinel.
println!(
"{}",
serde_json::json!({ "error": err.to_string(), "type": "error" })
);
}
println!("{BATCH_EOR}");
stdout.lock().flush().ok();
}
Ok(())
}
/// Cross-language ReadBulk stress benchmark — mirrors the .NET / Go / Python /
/// Java implementations so the PS driver collates one JSON schema across all
/// five clients.
+14 -7
View File
@@ -294,15 +294,22 @@ path and writes a JSON report under `artifacts/e2e/`:
write support (`MxAccessCommandExecutor` returning `InvalidRequest` for
`Write`/`Write2`/`WriteSecured`/`WriteSecured2`).
Each client CLI is driven through one long-lived `batch` process. Every CLI
exposes a `batch` subcommand: a process that reads one command line from stdin,
runs it through the normal subcommand dispatch, writes the JSON result, then a
line containing exactly `__MXGW_BATCH_EOR__`. The harness launches one such
process per client and pings the ~250 operations of the flow through it, so the
process — and, for the JVM, the runtime — cold-start is paid once per client
instead of once per operation. A command that fails inside the batch process
writes its `{"error":...}` envelope and the loop continues; the harness treats
that envelope as the operation failure (used by the parity and auth phases).
Before the per-client phases run, the script builds the .NET CLI
(`dotnet build`) and installs the Java CLI (`gradle :mxgateway-cli:installDist`)
once, then invokes the compiled artifacts directly. The matrix issues several
hundred CLI calls per client; invoking `dotnet run` / `gradle
:mxgateway-cli:run` per call rebuilds and cold-starts the toolchain every time,
which stretches the add-item/advise loop long enough for the worker event
channel to overflow under `FailFast` backpressure. The Go, Rust, and Python
clients still build on demand (`go run` / `cargo run` / `python -m`) because
their per-call startup is already sub-second.
once, so the `batch` process launches straight from the compiled exe / the
installed launcher. The Go, Rust, and Python batch processes are launched via
`go run` / `cargo run` / `python -m`, which compile-or-start once when that
single per-client process starts.
Build the gateway and worker, start the gateway, and provide a valid API key
before running the client e2e script:
+200 -30
View File
@@ -9,6 +9,11 @@ register, bulk subscribe/unsubscribe, per-tag add-item/advise, event
streaming, a write round-trip with value assertion, error-path (parity)
checks, and API-key auth rejection.
Each client CLI is driven through one long-lived `batch` process: the harness
writes one command line to its stdin and reads the JSON result back, so the
~250 operations per client pay the process (and JVM/runtime) cold-start once
instead of once per operation.
The gateway and worker are assumed to be already running at -Endpoint; the
script does not start or stop them.
#>
@@ -467,12 +472,11 @@ function Assert-BulkResults {
}
# Builds the dotnet and Java client CLIs once up front and records the path to
# each compiled artifact. The e2e matrix issues ~250 CLI calls per client;
# invoking `dotnet run` / `gradle :mxgateway-cli:run` per call rebuilds and
# cold-starts the toolchain every time, stretching the per-tag advise loop long
# enough for the worker event channel to overflow under the FailFast
# backpressure policy. Running the compiled artifact keeps per-call latency
# sub-second, matching the Go/Rust/Python paths.
# each compiled artifact, so the long-lived `batch` process is launched from
# the compiled exe / installed launcher without paying a `dotnet build` or
# `gradle` step at flow time. The Go, Rust, and Python batch processes are
# launched via `go run` / `cargo run` / `python -m`, which compile-or-start
# once when that single per-client process starts.
function Initialize-ClientBuilds {
if ($Clients -contains "dotnet") {
$cliProject = Join-Path $repoRoot "clients/dotnet/MxGateway.Client.Cli/MxGateway.Client.Cli.csproj"
@@ -801,6 +805,161 @@ function Get-DryRunReply {
}
}
# --- Batch-mode client process ---------------------------------------------
# The e2e flow issues ~250 operations per client. Spawning one CLI process per
# operation pays a process — and, for the JVM, a runtime — cold-start every
# time. Instead each client CLI exposes a `batch` subcommand: a single
# long-lived process that reads one command line from stdin, runs it, writes
# the JSON result, then a line containing exactly $batchTerminator. The harness
# drives that one process per client, so startup is paid once.
$script:batchTerminator = "__MXGW_BATCH_EOR__"
$script:currentBatchClient = $null
# A redirected child's StandardInput writer is created with Console.InputEncoding,
# which is UTF-8 *with a BOM* on this host. The writer then prepends that BOM to
# the first bytes it sends, and the CLIs parse it into their first argument.
# Switching the console input encoding to a BOM-less encoding before any batch
# process starts makes that writer BOM-free. The e2e command lines are ASCII.
try {
[Console]::InputEncoding = [System.Text.Encoding]::ASCII
} catch {
Write-Warning "Could not set a BOM-less console input encoding: $($_.Exception.Message)"
}
# Derives the `batch`-process launch spec from Get-ClientCommand: the launch
# prefix is whatever precedes the operation token (e.g. `run -p mxgw-cli --`),
# with the operation itself replaced by `batch`.
function Get-BatchLaunchSpec {
param([string]$Client)
$command = Get-ClientCommand -Client $Client -Operation "open-session" -Values @{} -ApiKeyEnvName $ApiKeyEnv
$argList = [object[]]$command.args
$operationIndex = [Array]::IndexOf($argList, "open-session")
if ($operationIndex -lt 0) {
throw "Cannot locate the operation token in the '$Client' command line."
}
$prefix = if ($operationIndex -gt 0) { @($argList[0..($operationIndex - 1)]) } else { @() }
return [pscustomobject]@{
file = $command.file
args = @($prefix + "batch")
cwd = $command.cwd
env = $command.env
}
}
# Returns just the operation arguments (operation token + flags) for a client
# command, stripping the launch prefix — this is the line written to the batch
# process for one operation.
function Get-ClientOperationArgs {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values,
[string]$ApiKeyEnvName = $ApiKeyEnv
)
$command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$argList = [object[]]$command.args
$operationIndex = [Array]::IndexOf($argList, $Operation)
if ($operationIndex -le 0) {
return @($argList)
}
return @($argList[$operationIndex..($argList.Count - 1)])
}
# True when a parsed command reply is the CLI's failure envelope rather than a
# normal result. All five CLIs emit a top-level `error` field on failure.
function Test-OperationFailed {
param([object]$Json)
if ($null -eq $Json) {
return $true
}
$errorValue = Get-PropertyValue -Object $Json -Names @("error")
return -not [string]::IsNullOrEmpty([string]$errorValue)
}
# Starts the long-lived `batch` process for a client and returns a handle
# carrying the process and its redirected stdin/stdout streams.
function Start-BatchClient {
param([string]$Client)
$spec = Get-BatchLaunchSpec -Client $Client
$startInfo = [System.Diagnostics.ProcessStartInfo]::new()
$startInfo.FileName = $spec.file
$startInfo.Arguments = ($spec.args | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " "
$startInfo.WorkingDirectory = $spec.cwd
$startInfo.RedirectStandardInput = $true
$startInfo.RedirectStandardOutput = $true
# stderr is left attached to the console: the CLIs only log diagnostics
# there, and not redirecting it removes any risk of the child blocking on a
# full stderr pipe while the harness reads stdout.
$startInfo.RedirectStandardError = $false
$startInfo.UseShellExecute = $false
foreach ($entry in $spec.env.GetEnumerator()) {
$startInfo.Environment[$entry.Key] = [string]$entry.Value
}
$process = [System.Diagnostics.Process]::new()
$process.StartInfo = $startInfo
[void]$process.Start()
return [pscustomobject]@{ client = $Client; process = $process; input = $process.StandardInput }
}
# Sends one operation to a batch process and returns its raw JSON output text
# (everything written before the terminator line).
function Invoke-BatchOperation {
param(
[pscustomobject]$BatchClient,
[string]$Client,
[string]$Operation,
[hashtable]$Values,
[string]$ApiKeyEnvName
)
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
$process = $BatchClient.process
$BatchClient.input.WriteLine(($operationArgs -join " "))
$BatchClient.input.Flush()
$builder = [System.Text.StringBuilder]::new()
while ($true) {
$line = $process.StandardOutput.ReadLine()
if ($null -eq $line) {
throw ("Batch client '$Client' closed its output before terminating operation " +
"'$Operation' (process exited: $($process.HasExited)).")
}
if ($line -eq $script:batchTerminator) {
break
}
[void]$builder.AppendLine($line)
}
return $builder.ToString()
}
# Signals end-of-input to a batch process and waits for it to exit.
function Stop-BatchClient {
param([pscustomobject]$BatchClient)
if ($null -eq $BatchClient) {
return
}
$process = $BatchClient.process
try {
if (-not $process.HasExited) {
$BatchClient.input.Close()
if (-not $process.WaitForExit(15000)) {
$process.Kill($true)
}
}
} catch {
try { $process.Kill($true) } catch { }
} finally {
$process.Dispose()
}
}
function Invoke-ClientOperation {
param(
[string]$Client,
@@ -809,21 +968,27 @@ function Invoke-ClientOperation {
[string]$ApiKeyEnvName = $ApiKeyEnv
)
$command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$result = Invoke-NativeCommand `
-FilePath $command.file `
-Arguments $command.args `
-WorkingDirectory $command.cwd `
-Environment $command.env
if ($DryRun) {
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')"
return Get-DryRunReply -Client $Client -Operation $Operation -Values $Values
}
return Read-JsonObject -Text $result.stdout
$stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client `
-Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$json = Read-JsonObject -Text $stdout
if (Test-OperationFailed -Json $json) {
$errorValue = Get-PropertyValue -Object $json -Names @("error")
throw "$Client $Operation failed: $errorValue"
}
return $json
}
# Runs a client operation that is expected to fail, returning the raw process
# result (exit code + stderr) without throwing. Under -DryRun a synthetic
# failure is returned so the parity and auth phases can be exercised offline.
# Runs a client operation that is expected to fail. Returns a record whose
# `failed` flag is true when the CLI reported its failure envelope. Under
# -DryRun a synthetic failure is returned so the parity and auth phases can be
# exercised offline.
function Invoke-ClientOperationExpectingFailure {
param(
[string]$Client,
@@ -833,18 +998,16 @@ function Invoke-ClientOperationExpectingFailure {
)
if ($DryRun) {
$command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
Write-Host "[dry-run] $(Join-CommandLine -FilePath $command.file -Arguments $command.args)"
return [pscustomobject]@{ exitCode = 1; stdout = ""; stderr = "[dry-run] synthetic expected failure" }
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')"
return [pscustomobject]@{ failed = $true; json = $null }
}
$command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
return Invoke-NativeCommand `
-FilePath $command.file `
-Arguments $command.args `
-WorkingDirectory $command.cwd `
-Environment $command.env `
-AllowFailure
$stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client `
-Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$json = Read-JsonObject -Text $stdout
return [pscustomobject]@{ failed = (Test-OperationFailed -Json $json); json = $json }
}
# Connects a short-lived StreamEvents consumer so the gateway empties the worker
@@ -897,6 +1060,10 @@ function Invoke-ClientFlow {
}
try {
if (-not $DryRun) {
$script:currentBatchClient = Start-BatchClient -Client $Client
}
$openJson = Invoke-ClientOperation -Client $Client -Operation "open-session"
$sessionId = Get-OpenSessionId -Client $Client -Json $openJson
if ([string]::IsNullOrWhiteSpace($sessionId)) {
@@ -1138,11 +1305,10 @@ function Invoke-ClientFlow {
foreach ($parityCheck in $parityChecks) {
$parityResult = Invoke-ClientOperationExpectingFailure `
-Client $Client -Operation $parityCheck.operation -Values $parityCheck.values
$passed = $parityResult.exitCode -ne 0
$passed = [bool]$parityResult.failed
$clientResult.parity += [ordered]@{
check = $parityCheck.check
operation = $parityCheck.operation
exitCode = $parityResult.exitCode
passed = $passed
}
if (-not $passed) {
@@ -1165,10 +1331,9 @@ function Invoke-ClientFlow {
foreach ($authCheck in $authChecks) {
$authResult = Invoke-ClientOperationExpectingFailure `
-Client $Client -Operation "open-session" -ApiKeyEnvName $authCheck.apiKeyEnv
$passed = $authResult.exitCode -ne 0
$passed = [bool]$authResult.failed
$clientResult.auth += [ordered]@{
check = $authCheck.check
exitCode = $authResult.exitCode
passed = $passed
}
if (-not $passed) {
@@ -1190,6 +1355,11 @@ function Invoke-ClientFlow {
$clientResult.error = "$($clientResult.error) close-session failed: $($_.Exception.Message)"
}
}
if ($null -ne $script:currentBatchClient) {
Stop-BatchClient -BatchClient $script:currentBatchClient
$script:currentBatchClient = $null
}
}
return $clientResult