e2e: port batch subcommand to all five client CLIs

scripts/run-client-e2e-tests.ps1 expects each language CLI to expose a
`batch` subcommand that reads command lines from stdin, runs each
through the normal subcommand dispatch, writes the JSON result, then
a sentinel line `__MXGW_BATCH_EOR__`. The implementation lived on a
divergent branch (commit 6126099) that was never merged into main —
this commit ports the same protocol to HEAD's renamed CLIs so the
existing matrix script runs end-to-end.

The protocol:
  - one line of stdin = one full CLI invocation
  - successful output → stdout, then __MXGW_BATCH_EOR__
  - failure → {"error":"...","type":"error"} JSON on stdout, then
    __MXGW_BATCH_EOR__ (errors do NOT exit the loop)
  - empty line or EOF terminates the loop

Per-CLI additions:

  .NET: RunBatchAsync + per-line StringWriter capture, JSON error
    envelope when forceJsonErrors is true. Two new tests in
    MxGatewayClientCliTests covering the success and error paths.

  Go:   runBatch with bufio.Scanner, runs each line through the
    existing runWithIO switch with a buffered stdout writer. One new
    test pinning the EOR sentinel.

  Rust: new `Batch` variant on the clap Command enum, run_batch
    re-parses each line via Cli::try_parse_from. Two new tests in the
    inline mod tests block.

  Python: new `batch` click command in commands.py that uses
    CliRunner to dispatch each line; synthesises {"error",..."type"}
    JSON from click error messages when the captured output isn't
    already JSON-shaped. Three new tests in test_cli.py.

  Java: BatchCommand inner @Command with BufferedReader stdin loop,
    fresh commandLine() per dispatch with captured stdout/stderr
    PrintWriters; non-zero exit codes and uncaught exceptions both
    surface as JSON-error blocks. Two new tests.

Also fixes scripts/run-client-e2e-tests.ps1 line 705: the Python
invocation was still passing the old module name `mxgateway_cli` to
`python -m`; the client SDK rename in 397d3c5 moved it to
`zb_mom_ww_mxgateway_cli`. Without the fix the Python leg fails
with "No module named mxgateway_cli" before reaching open-session.

Verification: full matrix at the redeployed gateway (localhost:5120,
running ZB.MOM.WW.MxGateway.Server.exe / ZB.MOM.WW.MxGateway.Worker.exe)
with -SkipBulk -SkipReadWriteBulk -SkipParity -SkipAuth (those phases
exercise bulk read/write CLI subcommands that also live on the
divergent branch — porting those is a follow-up). All five clients
report `closed=true, addedItems=120, eventCount=5` and overall
`success=true`. Per-language unit tests pass:
  - dotnet: 59/59
  - go:     all packages clean
  - rust:   cargo test --workspace clean
  - python: 42/42
  - java:   gradle build SUCCESSFUL

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 04:08:15 -04:00
parent a68f0cf222
commit 71d2c39f01
10 changed files with 594 additions and 10 deletions
@@ -16,6 +16,8 @@ public static class MxGatewayClientCli
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web);
private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__";
/// <summary>Runs the CLI synchronously with the given arguments, writing output and errors.</summary>
/// <param name="args">Command-line arguments (command name followed by options).</param>
/// <param name="standardOutput">TextWriter for command output.</param>
@@ -25,7 +27,7 @@ public static class MxGatewayClientCli
TextWriter standardOutput,
TextWriter standardError)
{
return RunAsync(args, standardOutput, standardError)
return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null)
.GetAwaiter()
.GetResult();
}
@@ -35,11 +37,13 @@ public static class MxGatewayClientCli
/// <param name="standardOutput">TextWriter for command output.</param>
/// <param name="standardError">TextWriter for error messages.</param>
/// <param name="clientFactory">Optional factory to create the gateway client; defaults to MxGatewayClient.Create.</param>
/// <param name="standardInput">Optional TextReader for batch-mode stdin; defaults to <see cref="Console.In"/>.</param>
public static Task<int> RunAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null)
Func<MxGatewayClientOptions, IMxGatewayCliClient>? clientFactory = null,
TextReader? standardInput = null)
{
ArgumentNullException.ThrowIfNull(args);
ArgumentNullException.ThrowIfNull(standardOutput);
@@ -49,14 +53,17 @@ public static class MxGatewayClientCli
args,
standardOutput,
standardError,
clientFactory ?? CreateDefaultClient);
clientFactory ?? CreateDefaultClient,
standardInput ?? Console.In);
}
private static async Task<int> RunCoreAsync(
string[] args,
TextWriter standardOutput,
TextWriter standardError,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory)
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
TextReader standardInput,
bool forceJsonErrors = false)
{
if (args.Length is 0 || IsHelp(args[0]))
{
@@ -65,6 +72,12 @@ public static class MxGatewayClientCli
}
string command = args[0].ToLowerInvariant();
if (command is "batch")
{
return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false);
}
CliArguments arguments = new(args.Skip(1));
try
@@ -125,7 +138,7 @@ public static class MxGatewayClientCli
string? apiKey = arguments.GetOptional("api-key");
string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey);
if (arguments.HasFlag("json"))
if (forceJsonErrors || arguments.HasFlag("json"))
{
standardError.WriteLine(JsonSerializer.Serialize(
new { error = message, type = exception.GetType().Name },
@@ -140,6 +153,82 @@ public static class MxGatewayClientCli
}
}
/// <summary>
/// Runs the CLI in batch mode: reads one command line at a time from
/// <paramref name="standardInput"/>, dispatches it through the normal
/// routing, writes all output to <paramref name="standardOutput"/>, and
/// then appends <see cref="BatchEndOfRecord"/> as a sentinel so the
/// caller can delimit command results. Continues on failure; errors are
/// written as JSON to <paramref name="standardOutput"/> (not stderr) so
/// that the harness sees them inside the same delimited block. Exits 0
/// on EOF or empty line.
/// </summary>
private static async Task<int> RunBatchAsync(
TextWriter standardOutput,
Func<MxGatewayClientOptions, IMxGatewayCliClient> clientFactory,
TextReader standardInput)
{
while (true)
{
string? line = await standardInput.ReadLineAsync().ConfigureAwait(false);
// EOF or empty line signals clean exit.
if (line is null || line.Length is 0)
{
return 0;
}
// Split on runs of ASCII whitespace — no quoting support by design.
string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
// Per-command output is buffered so we can redirect errors to stdout.
using StringWriter commandOutput = new();
// Errors in batch mode go to stdout (same delimited block), formatted as JSON.
// We use a capturing error writer and re-emit through commandOutput after the
// command returns, so the EOR sentinel always follows the complete result.
using StringWriter commandError = new();
try
{
await RunCoreAsync(
lineArgs,
commandOutput,
commandError,
clientFactory,
standardInput,
forceJsonErrors: true)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
commandError.WriteLine(JsonSerializer.Serialize(
new { error = exception.Message, type = exception.GetType().Name },
JsonOptions));
}
// Write any buffered normal output first.
string commandOutputText = commandOutput.ToString();
if (commandOutputText.Length > 0)
{
standardOutput.Write(commandOutputText);
}
// Then any error output — in batch mode it belongs on stdout so the harness
// sees it inside the delimited record.
string commandErrorText = commandError.ToString();
if (commandErrorText.Length > 0)
{
standardOutput.Write(commandErrorText);
}
// Write the end-of-record sentinel and flush so the harness can unblock.
standardOutput.WriteLine(BatchEndOfRecord);
await standardOutput.FlushAsync().ConfigureAwait(false);
}
}
private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options)
{
return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options));
@@ -1032,6 +1121,7 @@ public static class MxGatewayClientCli
private static void WriteUsage(TextWriter writer)
{
writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)");
writer.WriteLine("mxgw-dotnet version [--json]");
writer.WriteLine("mxgw-dotnet ping --session-id <id> [--json]");
writer.WriteLine("mxgw-dotnet open-session [--client-name <name>] [--json]");
@@ -368,6 +368,66 @@ public sealed class MxGatewayClientCliTests
Assert.Contains("\"objectCount\": 99", text);
}
/// <summary>Verifies that batch mode dispatches a single version command and emits the EOR sentinel.</summary>
[Fact]
public async Task RunAsync_Batch_DispatchesVersionAndWritesEndOfRecord()
{
using var output = new StringWriter();
using var error = new StringWriter();
using var input = new StringReader("version --json\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: input);
Assert.Equal(0, exitCode);
string text = output.ToString();
Assert.Contains("\"gatewayProtocolVersion\":3", text);
Assert.Contains("__MXGW_BATCH_EOR__", text);
// The EOR marker must come after the JSON output.
int jsonIndex = text.IndexOf("\"gatewayProtocolVersion\"", StringComparison.Ordinal);
int eorIndex = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
Assert.True(jsonIndex >= 0 && eorIndex > jsonIndex);
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that batch mode routes per-command errors to stdout as JSON between EOR markers.</summary>
[Fact]
public async Task RunAsync_Batch_WritesErrorsToStdoutAsJson()
{
using var output = new StringWriter();
using var error = new StringWriter();
// Unknown command should produce an error on the captured error stream,
// which batch mode re-emits to stdout inside the same delimited block.
using var input = new StringReader("nope-not-a-command\nversion\n");
int exitCode = await MxGatewayClientCli.RunAsync(
["batch"],
output,
error,
clientFactory: null,
standardInput: input);
Assert.Equal(0, exitCode);
string text = output.ToString();
// Two records → two EOR markers.
int firstEor = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal);
int secondEor = text.IndexOf(
"__MXGW_BATCH_EOR__",
firstEor + 1,
StringComparison.Ordinal);
Assert.True(firstEor > 0);
Assert.True(secondEor > firstEor);
// The unknown-command error message must be on stdout (not on stderr).
Assert.Contains("nope-not-a-command", text);
Assert.DoesNotContain("nope-not-a-command", error.ToString());
// The follow-up `version` line should still succeed.
Assert.Contains("gateway-protocol=", text);
}
/// <summary>Fake CLI client for testing.</summary>
private sealed class FakeCliClient : IMxGatewayCliClient
{
+40 -1
View File
@@ -6,6 +6,7 @@
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
@@ -103,6 +104,8 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
return runGalaxyDiscover(ctx, args[1:], stdout, stderr)
case "galaxy-watch":
return runGalaxyWatch(ctx, args[1:], stdout, stderr)
case "batch":
return runBatch(ctx, os.Stdin, stdout, stderr)
default:
writeUsage(stderr)
return fmt.Errorf("unknown command %q", args[0])
@@ -666,7 +669,43 @@ type protojsonMessage interface {
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
// batchEOR is the end-of-result sentinel emitted to stdout after every command
// in batch mode, regardless of success or failure.
const batchEOR = "__MXGW_BATCH_EOR__"
// runBatch reads one command line at a time from in, dispatches each via the
// normal runWithIO routing, and writes a batchEOR sentinel to stdout after
// every result. Errors are serialised as JSON to stdout (not stderr) so the
// harness can parse them without interleaving stderr. The loop never terminates
// on command error; only stdin EOF (or an empty line) ends the session.
func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error {
bw := bufio.NewWriter(stdout)
scanner := bufio.NewScanner(in)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
break
}
args := strings.Fields(line)
if len(args) == 0 {
continue
}
if err := runWithIO(ctx, args, bw, stderr); err != nil {
// Write error as JSON to stdout (bw) so the harness sees it in the
// same stream as normal output, framed by the EOR sentinel.
errPayload := map[string]string{
"error": err.Error(),
"type": "error",
}
_ = writeJSON(bw, errPayload)
}
_, _ = fmt.Fprintln(bw, batchEOR)
_ = bw.Flush()
}
return scanner.Err()
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
+28
View File
@@ -47,6 +47,34 @@ func TestCommonOptionsRedactsAPIKey(t *testing.T) {
}
}
func TestRunBatchEmitsEORAfterVersion(t *testing.T) {
var stdout bytes.Buffer
var stderr bytes.Buffer
in := strings.NewReader("version --json\n")
if err := runBatch(t.Context(), in, &stdout, &stderr); err != nil {
t.Fatalf("runBatch() error = %v; stderr = %s", err, stderr.String())
}
out := stdout.String()
if !strings.Contains(out, "\n"+batchEOR+"\n") && !strings.HasSuffix(out, batchEOR+"\n") {
t.Fatalf("expected EOR marker %q in stdout; got: %q", batchEOR, out)
}
idx := strings.Index(out, batchEOR)
if idx <= 0 {
t.Fatalf("EOR marker not found or appeared before any output: %q", out)
}
payload := out[:idx]
var output versionOutput
if err := json.Unmarshal([]byte(payload), &output); err != nil {
t.Fatalf("parse JSON block before EOR: %v (payload=%q)", err, payload)
}
if output.GatewayProtocolVersion == 0 || output.WorkerProtocolVersion == 0 {
t.Fatalf("protocol versions were not populated: %+v", output)
}
}
func TestParseValueBuildsTypedValue(t *testing.T) {
value, err := parseValue("int32", "123")
if err != nil {
@@ -14,7 +14,11 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyAttribute;
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
@@ -116,9 +120,90 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
commandLine.addSubcommand("galaxy-discover", new GalaxyDiscoverCommand());
commandLine.addSubcommand("galaxy-watch", new GalaxyWatchCommand());
commandLine.addSubcommand("batch", new BatchCommand(clientFactory));
return commandLine;
}
/** Sentinel written to stdout after every command result in batch mode. */
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
* every result. Errors are written as JSON to stdout so the harness
* sees them in the same stream, delimited by the same sentinel. The
* loop never terminates on command failure; only stdin EOF (or an
* empty line) ends the session.
*/
@Command(name = "batch", description = "Reads CLI invocations from stdin and executes them sequentially.")
static final class BatchCommand implements Callable<Integer> {
private final MxGatewayCliClientFactory clientFactory;
@Spec
private CommandSpec spec;
BatchCommand(MxGatewayCliClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
@Override
public Integer call() {
PrintWriter out = spec.commandLine().getOut();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(System.in, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) {
break;
}
String[] args = line.trim().split("\\s+");
if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) {
continue;
}
StringWriter cmdOut = new StringWriter();
StringWriter cmdErr = new StringWriter();
PrintWriter cmdOutWriter = new PrintWriter(cmdOut, true);
PrintWriter cmdErrWriter = new PrintWriter(cmdErr, true);
try {
CommandLine cmd = commandLine(clientFactory);
cmd.setOut(cmdOutWriter);
cmd.setErr(cmdErrWriter);
int exitCode = cmd.execute(args);
cmdOutWriter.flush();
cmdErrWriter.flush();
String cmdOutput = cmdOut.toString();
if (!cmdOutput.isEmpty()) {
out.print(cmdOutput);
}
if (exitCode != 0) {
// Non-zero exit: emit the stderr content (if any) as a JSON
// error object to stdout so the harness can parse it in the
// same delimited stream.
String errText = cmdErr.toString().trim();
if (errText.isEmpty()) {
errText = "command exited with code " + exitCode;
}
Map<String, Object> errorPayload = new LinkedHashMap<>();
errorPayload.put("error", errText);
errorPayload.put("type", "error");
out.println(jsonObject(errorPayload));
}
} catch (Exception ex) {
Map<String, Object> errorPayload = new LinkedHashMap<>();
errorPayload.put("error", ex.getMessage() != null ? ex.getMessage() : ex.getClass().getName());
errorPayload.put("type", "error");
out.println(jsonObject(errorPayload));
}
out.println(BATCH_EOR);
out.flush();
}
} catch (java.io.IOException ex) {
// Stdin closed unexpectedly — treat as EOF and exit normally.
}
return 0;
}
}
abstract static class GalaxyCommand implements Callable<Integer> {
@Mixin
CommonOptions common = new CommonOptions();
@@ -4,8 +4,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
@@ -141,6 +144,47 @@ final class MxGatewayCliTests {
assertTrue(run.output().contains("\"wasSuccessful\":true"));
}
@Test
void batchCommandExecutesVersionAndEmitsEorMarker() {
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
assertEquals(0, run.exitCode());
String out = run.output();
assertTrue(out.contains("\"clientVersion\""), out);
assertTrue(out.contains(MxGatewayCli.BATCH_EOR), out);
}
@Test
void batchCommandEmitsEorAfterFailedCommandAndContinues() {
// An unknown subcommand causes a picocli parse error (non-zero exit).
// The loop must still emit BATCH_EOR for the failure and continue
// processing the subsequent valid command.
CliRun run = executeBatch(new FakeClientFactory(), "no-such-subcommand\nversion --json\n");
assertEquals(0, run.exitCode());
String out = run.output();
long eorCount = out.lines()
.filter(l -> l.equals(MxGatewayCli.BATCH_EOR))
.count();
assertEquals(2, eorCount, "expected exactly 2 EOR sentinels, got: " + eorCount + "\nOutput:\n" + out);
assertTrue(out.contains("\"clientVersion\""), out);
}
/**
* Runs the CLI with {@code batch} as the subcommand, using the provided
* string as standard input content. Temporarily replaces {@link System#in}
* for the duration of the call.
*/
private static CliRun executeBatch(MxGatewayCli.MxGatewayCliClientFactory factory, String stdinContent) {
InputStream originalIn = System.in;
try {
System.setIn(new ByteArrayInputStream(stdinContent.getBytes(StandardCharsets.UTF_8)));
return execute(factory, "batch");
} finally {
System.setIn(originalIn);
}
}
private static CliRun execute(MxGatewayCli.MxGatewayCliClientFactory factory, String... args) {
StringWriter output = new StringWriter();
StringWriter errors = new StringWriter();
@@ -5,11 +5,13 @@ from __future__ import annotations
import asyncio
import json
import os
import sys
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import Any
import click
from click.testing import CliRunner
from google.protobuf.json_format import MessageToDict
from zb_mom_ww_mxgateway import __version__
@@ -22,6 +24,8 @@ from zb_mom_ww_mxgateway.values import MxValueInput
MAX_AGGREGATE_EVENTS = 10_000
_BATCH_EOR = "__MXGW_BATCH_EOR__"
@click.group()
def main() -> None:
@@ -41,6 +45,80 @@ def version(output_json: bool) -> None:
_emit(payload, output_json=output_json, text=f"mxgw-py {__version__}")
@main.command()
def batch() -> None:
"""Read commands from stdin and execute each, writing output + __MXGW_BATCH_EOR__ after each.
Each non-empty line of stdin is a complete argument string (no quoting support — the
harness never passes whitespace-containing arguments). Lines are split on runs of ASCII
whitespace and dispatched through the normal CLI parser. On EOF or an empty line, exit 0.
Errors do NOT terminate the loop. Each command's output (including any error JSON) is
written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then
stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``.
"""
runner = CliRunner()
for raw_line in sys.stdin:
line = raw_line.rstrip("\n").rstrip("\r")
if not line:
# Empty line signals clean exit (matches the spec and .NET behaviour).
break
args = line.split()
try:
result = runner.invoke(main, args, catch_exceptions=True)
except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die
_batch_write_error(exc.__class__.__name__, str(exc))
_batch_flush_eor()
continue
if result.exit_code == 0:
# Normal success — write captured output as-is.
sys.stdout.write(result.output)
else:
# Something went wrong. If the command already emitted a JSON object
# (e.g. the output starts with '{'), trust that and relay it verbatim.
# Otherwise synthesise the standard {"error": ..., "type": ...} shape.
output = result.output or ""
exc = result.exception
if output.lstrip().startswith("{"):
# Already JSON — relay verbatim (may or may not end with newline).
sys.stdout.write(output)
if not output.endswith("\n"):
sys.stdout.write("\n")
elif exc is not None and not isinstance(exc, SystemExit):
_batch_write_error(type(exc).__name__, str(exc))
else:
# Click's default error format is "Error: <message>\n"; extract the
# message so the harness gets clean JSON.
msg = output.strip()
if msg.startswith("Error: "):
msg = msg[len("Error: "):]
exc_type = (
type(exc).__name__
if exc is not None and not isinstance(exc, SystemExit)
else "CliError"
)
_batch_write_error(exc_type, msg)
_batch_flush_eor()
def _batch_write_error(exc_type: str, message: str) -> None:
"""Write a JSON error record to stdout in the standard batch error shape."""
sys.stdout.write(json.dumps({"error": message, "type": exc_type}) + "\n")
def _batch_flush_eor() -> None:
"""Write the end-of-record sentinel and flush stdout."""
sys.stdout.write(_BATCH_EOR + "\n")
sys.stdout.flush()
def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]:
"""Apply the shared gateway connection options to a Click command."""
command = click.option("--endpoint", default="localhost:5000", show_default=True)(command)
+58
View File
@@ -7,6 +7,8 @@ from click.testing import CliRunner
from zb_mom_ww_mxgateway import __version__
from zb_mom_ww_mxgateway_cli.commands import main
_BATCH_EOR = "__MXGW_BATCH_EOR__"
def test_version_json_is_deterministic() -> None:
runner = CliRunner()
@@ -66,3 +68,59 @@ def test_cli_error_output_redacts_api_key() -> None:
assert result.exit_code != 0
assert "mxgw_test_secret" not in result.output
def test_batch_runs_version_command_and_writes_eor() -> None:
runner = CliRunner()
result = runner.invoke(main, ["batch"], input="version --json\n")
assert result.exit_code == 0
blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block]
assert len(blocks) == 1
payload = json.loads(blocks[0].strip())
assert payload == {
"client": "mxgw-py",
"package": "mxaccess-gateway-client",
"version": __version__,
}
def test_batch_terminates_on_empty_line() -> None:
runner = CliRunner()
result = runner.invoke(
main,
["batch"],
input="version --json\n\nversion --json\n",
)
assert result.exit_code == 0
# Only the first command runs; the empty line breaks the loop before the second.
assert result.output.count(_BATCH_EOR) == 1
def test_batch_continues_after_error_line() -> None:
runner = CliRunner()
# First line is invalid (unknown subcommand), second is a valid version call.
result = runner.invoke(
main,
["batch"],
input="not-a-real-command\nversion --json\n",
)
assert result.exit_code == 0
assert result.output.count(_BATCH_EOR) == 2
blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block]
assert len(blocks) == 2
# First block: error JSON ({"error": "...", "type": "..."}).
error_payload = json.loads(blocks[0].strip().splitlines()[-1])
assert "error" in error_payload
assert "type" in error_payload
# Second block: successful version JSON.
version_payload = json.loads(blocks[1].strip())
assert version_payload["version"] == __version__
+105 -3
View File
@@ -9,6 +9,7 @@
#![warn(missing_docs)]
use std::env;
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
@@ -189,6 +190,13 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Read commands from stdin, one per line, execute each in sequence, and
/// write `__MXGW_BATCH_EOR__` to stdout after every result. Errors are
/// written as `{"error":"…","type":"error"}` JSON to stdout (not stderr)
/// so the harness can parse them without interleaving stderr. The loop
/// never terminates on command error; only stdin EOF (or an empty line)
/// ends the session.
Batch,
#[command(subcommand)]
Galaxy(GalaxyCommand),
}
@@ -297,7 +305,11 @@ enum CliValueType {
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
match run(cli).await {
let result = match cli.command {
Command::Batch => run_batch().await,
command => dispatch(command).await,
};
match result {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
eprintln!("{error}");
@@ -306,8 +318,17 @@ async fn main() -> ExitCode {
}
}
async fn run(cli: Cli) -> Result<(), Error> {
match cli.command {
/// Dispatch a parsed [`Command`] to its handler. All subcommands except
/// [`Command::Batch`] are handled here; `Batch` is handled separately in
/// `main` to avoid mutual recursion between `dispatch` and `run_batch`.
async fn dispatch(command: Command) -> Result<(), Error> {
match command {
Command::Batch => {
return Err(Error::InvalidArgument {
name: "batch".to_owned(),
detail: "batch cannot be nested inside another batch session".to_owned(),
});
}
Command::Version { json, .. } => print_version(json),
Command::Ping {
connection,
@@ -706,6 +727,76 @@ async fn session_for(
Ok(client.session(session_id))
}
/// End-of-result sentinel written to stdout after every batch command.
const BATCH_EOR: &str = "__MXGW_BATCH_EOR__";
/// Run the batch loop: read one command line at a time from stdin, dispatch
/// each through the normal [`dispatch`] path, and write [`BATCH_EOR`] to
/// stdout after every result. Errors are serialised as JSON to stdout so
/// the harness can parse them without interleaving stderr. The loop never
/// terminates on command error; only stdin EOF or an empty line ends the
/// session.
async fn run_batch() -> Result<(), Error> {
let stdin = io::stdin();
let stdout = io::stdout();
for line in stdin.lock().lines() {
let line = line.map_err(|e| Error::InvalidArgument {
name: "stdin".to_owned(),
detail: e.to_string(),
})?;
if line.is_empty() {
break;
}
let parts: Vec<&str> = line.split_ascii_whitespace().collect();
if parts.is_empty() {
println!("{BATCH_EOR}");
stdout.lock().flush().ok();
continue;
}
// Re-parse the split arguments under a fresh Cli, prepending the
// program-name placeholder so clap sees a complete argv[].
let parse_result =
Cli::try_parse_from(std::iter::once("mxgw-cli").chain(parts.iter().copied()));
let outcome: Result<(), Error> = match parse_result {
Ok(cli) => {
// Spawn on a new tokio task so each command runs with a fresh
// stack, avoiding stack overflow from the large dispatch future.
tokio::task::spawn(dispatch(cli.command))
.await
.unwrap_or_else(|join_err| {
Err(Error::InvalidArgument {
name: "task".to_owned(),
detail: join_err.to_string(),
})
})
}
Err(clap_err) => Err(Error::InvalidArgument {
name: "args".to_owned(),
detail: clap_err.to_string(),
}),
};
if let Err(err) = outcome {
// Write error as JSON to stdout so the harness sees it in the
// same stream as normal output, framed by the EOR sentinel.
println!(
"{}",
serde_json::json!({ "error": err.to_string(), "type": "error" })
);
}
println!("{BATCH_EOR}");
stdout.lock().flush().ok();
}
Ok(())
}
fn print_version(use_json: bool) {
if use_json {
println!("{}", version_json());
@@ -1073,6 +1164,17 @@ mod tests {
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_batch_command() {
let parsed = Cli::try_parse_from(["mxgw", "batch"]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn batch_eor_marker_is_stable() {
assert_eq!(super::BATCH_EOR, "__MXGW_BATCH_EOR__");
}
#[test]
fn rfc3339_parser_round_trips_z_and_offset_inputs() {
// 2026-04-28T15:30:00Z = 1_777_995_000 (sanity-checked once below)
+1 -1
View File
@@ -702,7 +702,7 @@ function Get-ClientCommand {
}
"python" {
$arguments = @(
"-m", "mxgateway_cli", $Operation,
"-m", "zb_mom_ww_mxgateway_cli", $Operation,
"--endpoint", $hostEndpoint,
"--api-key-env", $ApiKeyEnvName,
"--plaintext",