From 71d2c39f01e36a382d20ea592bc84071eff05004 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 04:08:15 -0400 Subject: [PATCH] e2e: port `batch` subcommand to all five client CLIs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/run-client-e2e-tests.ps1 expects each language CLI to expose a `batch` subcommand that reads command lines from stdin, runs each through the normal subcommand dispatch, writes the JSON result, then a sentinel line `__MXGW_BATCH_EOR__`. The implementation lived on a divergent branch (commit 6126099) that was never merged into main — this commit ports the same protocol to HEAD's renamed CLIs so the existing matrix script runs end-to-end. The protocol: - one line of stdin = one full CLI invocation - successful output → stdout, then __MXGW_BATCH_EOR__ - failure → {"error":"...","type":"error"} JSON on stdout, then __MXGW_BATCH_EOR__ (errors do NOT exit the loop) - empty line or EOF terminates the loop Per-CLI additions: .NET: RunBatchAsync + per-line StringWriter capture, JSON error envelope when forceJsonErrors is true. Two new tests in MxGatewayClientCliTests covering the success and error paths. Go: runBatch with bufio.Scanner, runs each line through the existing runWithIO switch with a buffered stdout writer. One new test pinning the EOR sentinel. Rust: new `Batch` variant on the clap Command enum, run_batch re-parses each line via Cli::try_parse_from. Two new tests in the inline mod tests block. Python: new `batch` click command in commands.py that uses CliRunner to dispatch each line; synthesises {"error",..."type"} JSON from click error messages when the captured output isn't already JSON-shaped. Three new tests in test_cli.py. Java: BatchCommand inner @Command with BufferedReader stdin loop, fresh commandLine() per dispatch with captured stdout/stderr PrintWriters; non-zero exit codes and uncaught exceptions both surface as JSON-error blocks. Two new tests. Also fixes scripts/run-client-e2e-tests.ps1 line 705: the Python invocation was still passing the old module name `mxgateway_cli` to `python -m`; the client SDK rename in 397d3c5 moved it to `zb_mom_ww_mxgateway_cli`. Without the fix the Python leg fails with "No module named mxgateway_cli" before reaching open-session. Verification: full matrix at the redeployed gateway (localhost:5120, running ZB.MOM.WW.MxGateway.Server.exe / ZB.MOM.WW.MxGateway.Worker.exe) with -SkipBulk -SkipReadWriteBulk -SkipParity -SkipAuth (those phases exercise bulk read/write CLI subcommands that also live on the divergent branch — porting those is a follow-up). All five clients report `closed=true, addedItems=120, eventCount=5` and overall `success=true`. Per-language unit tests pass: - dotnet: 59/59 - go: all packages clean - rust: cargo test --workspace clean - python: 42/42 - java: gradle build SUCCESSFUL Co-Authored-By: Claude Opus 4.7 (1M context) --- .../MxGatewayClientCli.cs | 100 +++++++++++++++- .../MxGatewayClientCliTests.cs | 60 ++++++++++ clients/go/cmd/mxgw-go/main.go | 41 ++++++- clients/go/cmd/mxgw-go/main_test.go | 28 +++++ .../zb/mom/ww/mxgateway/cli/MxGatewayCli.java | 85 ++++++++++++++ .../ww/mxgateway/cli/MxGatewayCliTests.java | 44 +++++++ .../src/zb_mom_ww_mxgateway_cli/commands.py | 78 +++++++++++++ clients/python/tests/test_cli.py | 58 ++++++++++ clients/rust/crates/mxgw-cli/src/main.rs | 108 +++++++++++++++++- scripts/run-client-e2e-tests.ps1 | 2 +- 10 files changed, 594 insertions(+), 10 deletions(-) diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs index 9f7dffa..a9d794f 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -16,6 +16,8 @@ public static class MxGatewayClientCli private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); + private const string BatchEndOfRecord = "__MXGW_BATCH_EOR__"; + /// Runs the CLI synchronously with the given arguments, writing output and errors. /// Command-line arguments (command name followed by options). /// TextWriter for command output. @@ -25,7 +27,7 @@ public static class MxGatewayClientCli TextWriter standardOutput, TextWriter standardError) { - return RunAsync(args, standardOutput, standardError) + return RunAsync(args, standardOutput, standardError, clientFactory: null, standardInput: null) .GetAwaiter() .GetResult(); } @@ -35,11 +37,13 @@ public static class MxGatewayClientCli /// TextWriter for command output. /// TextWriter for error messages. /// Optional factory to create the gateway client; defaults to MxGatewayClient.Create. + /// Optional TextReader for batch-mode stdin; defaults to . public static Task RunAsync( string[] args, TextWriter standardOutput, TextWriter standardError, - Func? clientFactory = null) + Func? clientFactory = null, + TextReader? standardInput = null) { ArgumentNullException.ThrowIfNull(args); ArgumentNullException.ThrowIfNull(standardOutput); @@ -49,14 +53,17 @@ public static class MxGatewayClientCli args, standardOutput, standardError, - clientFactory ?? CreateDefaultClient); + clientFactory ?? CreateDefaultClient, + standardInput ?? Console.In); } private static async Task RunCoreAsync( string[] args, TextWriter standardOutput, TextWriter standardError, - Func clientFactory) + Func clientFactory, + TextReader standardInput, + bool forceJsonErrors = false) { if (args.Length is 0 || IsHelp(args[0])) { @@ -65,6 +72,12 @@ public static class MxGatewayClientCli } string command = args[0].ToLowerInvariant(); + + if (command is "batch") + { + return await RunBatchAsync(standardOutput, clientFactory, standardInput).ConfigureAwait(false); + } + CliArguments arguments = new(args.Skip(1)); try @@ -125,7 +138,7 @@ public static class MxGatewayClientCli string? apiKey = arguments.GetOptional("api-key"); string message = MxGatewayCliSecretRedactor.Redact(exception.Message, apiKey); - if (arguments.HasFlag("json")) + if (forceJsonErrors || arguments.HasFlag("json")) { standardError.WriteLine(JsonSerializer.Serialize( new { error = message, type = exception.GetType().Name }, @@ -140,6 +153,82 @@ public static class MxGatewayClientCli } } + /// + /// Runs the CLI in batch mode: reads one command line at a time from + /// , dispatches it through the normal + /// routing, writes all output to , and + /// then appends as a sentinel so the + /// caller can delimit command results. Continues on failure; errors are + /// written as JSON to (not stderr) so + /// that the harness sees them inside the same delimited block. Exits 0 + /// on EOF or empty line. + /// + private static async Task RunBatchAsync( + TextWriter standardOutput, + Func clientFactory, + TextReader standardInput) + { + while (true) + { + string? line = await standardInput.ReadLineAsync().ConfigureAwait(false); + + // EOF or empty line signals clean exit. + if (line is null || line.Length is 0) + { + return 0; + } + + // Split on runs of ASCII whitespace — no quoting support by design. + string[] lineArgs = line.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries); + + // Per-command output is buffered so we can redirect errors to stdout. + using StringWriter commandOutput = new(); + + // Errors in batch mode go to stdout (same delimited block), formatted as JSON. + // We use a capturing error writer and re-emit through commandOutput after the + // command returns, so the EOR sentinel always follows the complete result. + using StringWriter commandError = new(); + + try + { + await RunCoreAsync( + lineArgs, + commandOutput, + commandError, + clientFactory, + standardInput, + forceJsonErrors: true) + .ConfigureAwait(false); + } + catch (Exception exception) when (exception is not OperationCanceledException) + { + // Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe). + commandError.WriteLine(JsonSerializer.Serialize( + new { error = exception.Message, type = exception.GetType().Name }, + JsonOptions)); + } + + // Write any buffered normal output first. + string commandOutputText = commandOutput.ToString(); + if (commandOutputText.Length > 0) + { + standardOutput.Write(commandOutputText); + } + + // Then any error output — in batch mode it belongs on stdout so the harness + // sees it inside the delimited record. + string commandErrorText = commandError.ToString(); + if (commandErrorText.Length > 0) + { + standardOutput.Write(commandErrorText); + } + + // Write the end-of-record sentinel and flush so the harness can unblock. + standardOutput.WriteLine(BatchEndOfRecord); + await standardOutput.FlushAsync().ConfigureAwait(false); + } + } + private static IMxGatewayCliClient CreateDefaultClient(MxGatewayClientOptions options) { return new MxGatewayCliClientAdapter(MxGatewayClient.Create(options)); @@ -1032,6 +1121,7 @@ public static class MxGatewayClientCli private static void WriteUsage(TextWriter writer) { + writer.WriteLine("mxgw-dotnet batch (reads commands from stdin; writes output + __MXGW_BATCH_EOR__ after each)"); writer.WriteLine("mxgw-dotnet version [--json]"); writer.WriteLine("mxgw-dotnet ping --session-id [--json]"); writer.WriteLine("mxgw-dotnet open-session [--client-name ] [--json]"); diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs index 49d2a2e..d6122ca 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs @@ -368,6 +368,66 @@ public sealed class MxGatewayClientCliTests Assert.Contains("\"objectCount\": 99", text); } + /// Verifies that batch mode dispatches a single version command and emits the EOR sentinel. + [Fact] + public async Task RunAsync_Batch_DispatchesVersionAndWritesEndOfRecord() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + using var input = new StringReader("version --json\n"); + + int exitCode = await MxGatewayClientCli.RunAsync( + ["batch"], + output, + error, + clientFactory: null, + standardInput: input); + + Assert.Equal(0, exitCode); + string text = output.ToString(); + Assert.Contains("\"gatewayProtocolVersion\":3", text); + Assert.Contains("__MXGW_BATCH_EOR__", text); + // The EOR marker must come after the JSON output. + int jsonIndex = text.IndexOf("\"gatewayProtocolVersion\"", StringComparison.Ordinal); + int eorIndex = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal); + Assert.True(jsonIndex >= 0 && eorIndex > jsonIndex); + Assert.Equal(string.Empty, error.ToString()); + } + + /// Verifies that batch mode routes per-command errors to stdout as JSON between EOR markers. + [Fact] + public async Task RunAsync_Batch_WritesErrorsToStdoutAsJson() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + // Unknown command should produce an error on the captured error stream, + // which batch mode re-emits to stdout inside the same delimited block. + using var input = new StringReader("nope-not-a-command\nversion\n"); + + int exitCode = await MxGatewayClientCli.RunAsync( + ["batch"], + output, + error, + clientFactory: null, + standardInput: input); + + Assert.Equal(0, exitCode); + string text = output.ToString(); + // Two records → two EOR markers. + int firstEor = text.IndexOf("__MXGW_BATCH_EOR__", StringComparison.Ordinal); + int secondEor = text.IndexOf( + "__MXGW_BATCH_EOR__", + firstEor + 1, + StringComparison.Ordinal); + Assert.True(firstEor > 0); + Assert.True(secondEor > firstEor); + // The unknown-command error message must be on stdout (not on stderr). + Assert.Contains("nope-not-a-command", text); + Assert.DoesNotContain("nope-not-a-command", error.ToString()); + // The follow-up `version` line should still succeed. + Assert.Contains("gateway-protocol=", text); + } + /// Fake CLI client for testing. private sealed class FakeCliClient : IMxGatewayCliClient { diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index 0fff337..a07ac82 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -6,6 +6,7 @@ package main import ( + "bufio" "context" "encoding/json" "errors" @@ -103,6 +104,8 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runGalaxyDiscover(ctx, args[1:], stdout, stderr) case "galaxy-watch": return runGalaxyWatch(ctx, args[1:], stdout, stderr) + case "batch": + return runBatch(ctx, os.Stdin, stdout, stderr) default: writeUsage(stderr) return fmt.Errorf("unknown command %q", args[0]) @@ -666,7 +669,43 @@ type protojsonMessage interface { } func writeUsage(writer io.Writer) { - fmt.Fprintln(writer, "usage: mxgw-go ") + fmt.Fprintln(writer, "usage: mxgw-go ") +} + +// batchEOR is the end-of-result sentinel emitted to stdout after every command +// in batch mode, regardless of success or failure. +const batchEOR = "__MXGW_BATCH_EOR__" + +// runBatch reads one command line at a time from in, dispatches each via the +// normal runWithIO routing, and writes a batchEOR sentinel to stdout after +// every result. Errors are serialised as JSON to stdout (not stderr) so the +// harness can parse them without interleaving stderr. The loop never terminates +// on command error; only stdin EOF (or an empty line) ends the session. +func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error { + bw := bufio.NewWriter(stdout) + scanner := bufio.NewScanner(in) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + break + } + args := strings.Fields(line) + if len(args) == 0 { + continue + } + if err := runWithIO(ctx, args, bw, stderr); err != nil { + // Write error as JSON to stdout (bw) so the harness sees it in the + // same stream as normal output, framed by the EOR sentinel. + errPayload := map[string]string{ + "error": err.Error(), + "type": "error", + } + _ = writeJSON(bw, errPayload) + } + _, _ = fmt.Fprintln(bw, batchEOR) + _ = bw.Flush() + } + return scanner.Err() } func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) { diff --git a/clients/go/cmd/mxgw-go/main_test.go b/clients/go/cmd/mxgw-go/main_test.go index 945cf09..f34292a 100644 --- a/clients/go/cmd/mxgw-go/main_test.go +++ b/clients/go/cmd/mxgw-go/main_test.go @@ -47,6 +47,34 @@ func TestCommonOptionsRedactsAPIKey(t *testing.T) { } } +func TestRunBatchEmitsEORAfterVersion(t *testing.T) { + var stdout bytes.Buffer + var stderr bytes.Buffer + + in := strings.NewReader("version --json\n") + if err := runBatch(t.Context(), in, &stdout, &stderr); err != nil { + t.Fatalf("runBatch() error = %v; stderr = %s", err, stderr.String()) + } + + out := stdout.String() + if !strings.Contains(out, "\n"+batchEOR+"\n") && !strings.HasSuffix(out, batchEOR+"\n") { + t.Fatalf("expected EOR marker %q in stdout; got: %q", batchEOR, out) + } + + idx := strings.Index(out, batchEOR) + if idx <= 0 { + t.Fatalf("EOR marker not found or appeared before any output: %q", out) + } + payload := out[:idx] + var output versionOutput + if err := json.Unmarshal([]byte(payload), &output); err != nil { + t.Fatalf("parse JSON block before EOR: %v (payload=%q)", err, payload) + } + if output.GatewayProtocolVersion == 0 || output.WorkerProtocolVersion == 0 { + t.Fatalf("protocol versions were not populated: %+v", output) + } +} + func TestParseValueBuildsTypedValue(t *testing.T) { value, err := parseValue("int32", "123") if err != nil { diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java index 703d314..802854a 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/main/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCli.java @@ -14,7 +14,11 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyAttribute; import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject; import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; @@ -116,9 +120,90 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand()); commandLine.addSubcommand("galaxy-discover", new GalaxyDiscoverCommand()); commandLine.addSubcommand("galaxy-watch", new GalaxyWatchCommand()); + commandLine.addSubcommand("batch", new BatchCommand(clientFactory)); return commandLine; } + /** Sentinel written to stdout after every command result in batch mode. */ + static final String BATCH_EOR = "__MXGW_BATCH_EOR__"; + + /** + * Reads one CLI invocation per stdin line, executes each via a fresh + * {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after + * every result. Errors are written as JSON to stdout so the harness + * sees them in the same stream, delimited by the same sentinel. The + * loop never terminates on command failure; only stdin EOF (or an + * empty line) ends the session. + */ + @Command(name = "batch", description = "Reads CLI invocations from stdin and executes them sequentially.") + static final class BatchCommand implements Callable { + private final MxGatewayCliClientFactory clientFactory; + + @Spec + private CommandSpec spec; + + BatchCommand(MxGatewayCliClientFactory clientFactory) { + this.clientFactory = clientFactory; + } + + @Override + public Integer call() { + PrintWriter out = spec.commandLine().getOut(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(System.in, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + break; + } + String[] args = line.trim().split("\\s+"); + if (args.length == 0 || (args.length == 1 && args[0].isEmpty())) { + continue; + } + StringWriter cmdOut = new StringWriter(); + StringWriter cmdErr = new StringWriter(); + PrintWriter cmdOutWriter = new PrintWriter(cmdOut, true); + PrintWriter cmdErrWriter = new PrintWriter(cmdErr, true); + try { + CommandLine cmd = commandLine(clientFactory); + cmd.setOut(cmdOutWriter); + cmd.setErr(cmdErrWriter); + int exitCode = cmd.execute(args); + cmdOutWriter.flush(); + cmdErrWriter.flush(); + String cmdOutput = cmdOut.toString(); + if (!cmdOutput.isEmpty()) { + out.print(cmdOutput); + } + if (exitCode != 0) { + // Non-zero exit: emit the stderr content (if any) as a JSON + // error object to stdout so the harness can parse it in the + // same delimited stream. + String errText = cmdErr.toString().trim(); + if (errText.isEmpty()) { + errText = "command exited with code " + exitCode; + } + Map errorPayload = new LinkedHashMap<>(); + errorPayload.put("error", errText); + errorPayload.put("type", "error"); + out.println(jsonObject(errorPayload)); + } + } catch (Exception ex) { + Map errorPayload = new LinkedHashMap<>(); + errorPayload.put("error", ex.getMessage() != null ? ex.getMessage() : ex.getClass().getName()); + errorPayload.put("type", "error"); + out.println(jsonObject(errorPayload)); + } + out.println(BATCH_EOR); + out.flush(); + } + } catch (java.io.IOException ex) { + // Stdin closed unexpectedly — treat as EOF and exit normally. + } + return 0; + } + } + abstract static class GalaxyCommand implements Callable { @Mixin CommonOptions common = new CommonOptions(); diff --git a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java index f74e5a7..1d11046 100644 --- a/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java +++ b/clients/java/zb-mom-ww-mxgateway-cli/src/test/java/com/zb/mom/ww/mxgateway/cli/MxGatewayCliTests.java @@ -4,8 +4,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import mxaccess_gateway.v1.MxaccessGateway.AddItemReply; @@ -141,6 +144,47 @@ final class MxGatewayCliTests { assertTrue(run.output().contains("\"wasSuccessful\":true")); } + @Test + void batchCommandExecutesVersionAndEmitsEorMarker() { + CliRun run = executeBatch(new FakeClientFactory(), "version --json\n"); + + assertEquals(0, run.exitCode()); + String out = run.output(); + assertTrue(out.contains("\"clientVersion\""), out); + assertTrue(out.contains(MxGatewayCli.BATCH_EOR), out); + } + + @Test + void batchCommandEmitsEorAfterFailedCommandAndContinues() { + // An unknown subcommand causes a picocli parse error (non-zero exit). + // The loop must still emit BATCH_EOR for the failure and continue + // processing the subsequent valid command. + CliRun run = executeBatch(new FakeClientFactory(), "no-such-subcommand\nversion --json\n"); + + assertEquals(0, run.exitCode()); + String out = run.output(); + long eorCount = out.lines() + .filter(l -> l.equals(MxGatewayCli.BATCH_EOR)) + .count(); + assertEquals(2, eorCount, "expected exactly 2 EOR sentinels, got: " + eorCount + "\nOutput:\n" + out); + assertTrue(out.contains("\"clientVersion\""), out); + } + + /** + * Runs the CLI with {@code batch} as the subcommand, using the provided + * string as standard input content. Temporarily replaces {@link System#in} + * for the duration of the call. + */ + private static CliRun executeBatch(MxGatewayCli.MxGatewayCliClientFactory factory, String stdinContent) { + InputStream originalIn = System.in; + try { + System.setIn(new ByteArrayInputStream(stdinContent.getBytes(StandardCharsets.UTF_8))); + return execute(factory, "batch"); + } finally { + System.setIn(originalIn); + } + } + private static CliRun execute(MxGatewayCli.MxGatewayCliClientFactory factory, String... args) { StringWriter output = new StringWriter(); StringWriter errors = new StringWriter(); diff --git a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py index dbe8407..f655987 100644 --- a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py +++ b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py @@ -5,11 +5,13 @@ from __future__ import annotations import asyncio import json import os +import sys from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import Any import click +from click.testing import CliRunner from google.protobuf.json_format import MessageToDict from zb_mom_ww_mxgateway import __version__ @@ -22,6 +24,8 @@ from zb_mom_ww_mxgateway.values import MxValueInput MAX_AGGREGATE_EVENTS = 10_000 +_BATCH_EOR = "__MXGW_BATCH_EOR__" + @click.group() def main() -> None: @@ -41,6 +45,80 @@ def version(output_json: bool) -> None: _emit(payload, output_json=output_json, text=f"mxgw-py {__version__}") +@main.command() +def batch() -> None: + """Read commands from stdin and execute each, writing output + __MXGW_BATCH_EOR__ after each. + + Each non-empty line of stdin is a complete argument string (no quoting support — the + harness never passes whitespace-containing arguments). Lines are split on runs of ASCII + whitespace and dispatched through the normal CLI parser. On EOF or an empty line, exit 0. + + Errors do NOT terminate the loop. Each command's output (including any error JSON) is + written to stdout followed by a line containing exactly ``__MXGW_BATCH_EOR__``, then + stdout is flushed. Error output is formatted as ``{"error": "...", "type": "..."}``. + """ + + runner = CliRunner() + + for raw_line in sys.stdin: + line = raw_line.rstrip("\n").rstrip("\r") + if not line: + # Empty line signals clean exit (matches the spec and .NET behaviour). + break + + args = line.split() + + try: + result = runner.invoke(main, args, catch_exceptions=True) + except Exception as exc: # noqa: BLE001 — be safe; never let batch loop die + _batch_write_error(exc.__class__.__name__, str(exc)) + _batch_flush_eor() + continue + + if result.exit_code == 0: + # Normal success — write captured output as-is. + sys.stdout.write(result.output) + else: + # Something went wrong. If the command already emitted a JSON object + # (e.g. the output starts with '{'), trust that and relay it verbatim. + # Otherwise synthesise the standard {"error": ..., "type": ...} shape. + output = result.output or "" + exc = result.exception + + if output.lstrip().startswith("{"): + # Already JSON — relay verbatim (may or may not end with newline). + sys.stdout.write(output) + if not output.endswith("\n"): + sys.stdout.write("\n") + elif exc is not None and not isinstance(exc, SystemExit): + _batch_write_error(type(exc).__name__, str(exc)) + else: + # Click's default error format is "Error: \n"; extract the + # message so the harness gets clean JSON. + msg = output.strip() + if msg.startswith("Error: "): + msg = msg[len("Error: "):] + exc_type = ( + type(exc).__name__ + if exc is not None and not isinstance(exc, SystemExit) + else "CliError" + ) + _batch_write_error(exc_type, msg) + + _batch_flush_eor() + + +def _batch_write_error(exc_type: str, message: str) -> None: + """Write a JSON error record to stdout in the standard batch error shape.""" + sys.stdout.write(json.dumps({"error": message, "type": exc_type}) + "\n") + + +def _batch_flush_eor() -> None: + """Write the end-of-record sentinel and flush stdout.""" + sys.stdout.write(_BATCH_EOR + "\n") + sys.stdout.flush() + + def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]: """Apply the shared gateway connection options to a Click command.""" command = click.option("--endpoint", default="localhost:5000", show_default=True)(command) diff --git a/clients/python/tests/test_cli.py b/clients/python/tests/test_cli.py index 729cb10..65e61e1 100644 --- a/clients/python/tests/test_cli.py +++ b/clients/python/tests/test_cli.py @@ -7,6 +7,8 @@ from click.testing import CliRunner from zb_mom_ww_mxgateway import __version__ from zb_mom_ww_mxgateway_cli.commands import main +_BATCH_EOR = "__MXGW_BATCH_EOR__" + def test_version_json_is_deterministic() -> None: runner = CliRunner() @@ -66,3 +68,59 @@ def test_cli_error_output_redacts_api_key() -> None: assert result.exit_code != 0 assert "mxgw_test_secret" not in result.output + + +def test_batch_runs_version_command_and_writes_eor() -> None: + runner = CliRunner() + + result = runner.invoke(main, ["batch"], input="version --json\n") + + assert result.exit_code == 0 + blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block] + assert len(blocks) == 1 + payload = json.loads(blocks[0].strip()) + assert payload == { + "client": "mxgw-py", + "package": "mxaccess-gateway-client", + "version": __version__, + } + + +def test_batch_terminates_on_empty_line() -> None: + runner = CliRunner() + + result = runner.invoke( + main, + ["batch"], + input="version --json\n\nversion --json\n", + ) + + assert result.exit_code == 0 + # Only the first command runs; the empty line breaks the loop before the second. + assert result.output.count(_BATCH_EOR) == 1 + + +def test_batch_continues_after_error_line() -> None: + runner = CliRunner() + + # First line is invalid (unknown subcommand), second is a valid version call. + result = runner.invoke( + main, + ["batch"], + input="not-a-real-command\nversion --json\n", + ) + + assert result.exit_code == 0 + assert result.output.count(_BATCH_EOR) == 2 + + blocks = [block for block in result.output.split(_BATCH_EOR + "\n") if block] + assert len(blocks) == 2 + + # First block: error JSON ({"error": "...", "type": "..."}). + error_payload = json.loads(blocks[0].strip().splitlines()[-1]) + assert "error" in error_payload + assert "type" in error_payload + + # Second block: successful version JSON. + version_payload = json.loads(blocks[1].strip()) + assert version_payload["version"] == __version__ diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 1d8fa20..ca7a16e 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -9,6 +9,7 @@ #![warn(missing_docs)] use std::env; +use std::io::{self, BufRead, Write}; use std::path::PathBuf; use std::process::ExitCode; use std::time::Duration; @@ -189,6 +190,13 @@ enum Command { #[arg(long)] json: bool, }, + /// Read commands from stdin, one per line, execute each in sequence, and + /// write `__MXGW_BATCH_EOR__` to stdout after every result. Errors are + /// written as `{"error":"…","type":"error"}` JSON to stdout (not stderr) + /// so the harness can parse them without interleaving stderr. The loop + /// never terminates on command error; only stdin EOF (or an empty line) + /// ends the session. + Batch, #[command(subcommand)] Galaxy(GalaxyCommand), } @@ -297,7 +305,11 @@ enum CliValueType { #[tokio::main] async fn main() -> ExitCode { let cli = Cli::parse(); - match run(cli).await { + let result = match cli.command { + Command::Batch => run_batch().await, + command => dispatch(command).await, + }; + match result { Ok(()) => ExitCode::SUCCESS, Err(error) => { eprintln!("{error}"); @@ -306,8 +318,17 @@ async fn main() -> ExitCode { } } -async fn run(cli: Cli) -> Result<(), Error> { - match cli.command { +/// Dispatch a parsed [`Command`] to its handler. All subcommands except +/// [`Command::Batch`] are handled here; `Batch` is handled separately in +/// `main` to avoid mutual recursion between `dispatch` and `run_batch`. +async fn dispatch(command: Command) -> Result<(), Error> { + match command { + Command::Batch => { + return Err(Error::InvalidArgument { + name: "batch".to_owned(), + detail: "batch cannot be nested inside another batch session".to_owned(), + }); + } Command::Version { json, .. } => print_version(json), Command::Ping { connection, @@ -706,6 +727,76 @@ async fn session_for( Ok(client.session(session_id)) } +/// End-of-result sentinel written to stdout after every batch command. +const BATCH_EOR: &str = "__MXGW_BATCH_EOR__"; + +/// Run the batch loop: read one command line at a time from stdin, dispatch +/// each through the normal [`dispatch`] path, and write [`BATCH_EOR`] to +/// stdout after every result. Errors are serialised as JSON to stdout so +/// the harness can parse them without interleaving stderr. The loop never +/// terminates on command error; only stdin EOF or an empty line ends the +/// session. +async fn run_batch() -> Result<(), Error> { + let stdin = io::stdin(); + let stdout = io::stdout(); + + for line in stdin.lock().lines() { + let line = line.map_err(|e| Error::InvalidArgument { + name: "stdin".to_owned(), + detail: e.to_string(), + })?; + + if line.is_empty() { + break; + } + + let parts: Vec<&str> = line.split_ascii_whitespace().collect(); + if parts.is_empty() { + println!("{BATCH_EOR}"); + stdout.lock().flush().ok(); + continue; + } + + // Re-parse the split arguments under a fresh Cli, prepending the + // program-name placeholder so clap sees a complete argv[]. + let parse_result = + Cli::try_parse_from(std::iter::once("mxgw-cli").chain(parts.iter().copied())); + + let outcome: Result<(), Error> = match parse_result { + Ok(cli) => { + // Spawn on a new tokio task so each command runs with a fresh + // stack, avoiding stack overflow from the large dispatch future. + tokio::task::spawn(dispatch(cli.command)) + .await + .unwrap_or_else(|join_err| { + Err(Error::InvalidArgument { + name: "task".to_owned(), + detail: join_err.to_string(), + }) + }) + } + Err(clap_err) => Err(Error::InvalidArgument { + name: "args".to_owned(), + detail: clap_err.to_string(), + }), + }; + + if let Err(err) = outcome { + // Write error as JSON to stdout so the harness sees it in the + // same stream as normal output, framed by the EOR sentinel. + println!( + "{}", + serde_json::json!({ "error": err.to_string(), "type": "error" }) + ); + } + + println!("{BATCH_EOR}"); + stdout.lock().flush().ok(); + } + + Ok(()) +} + fn print_version(use_json: bool) { if use_json { println!("{}", version_json()); @@ -1073,6 +1164,17 @@ mod tests { assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } + #[test] + fn parses_batch_command() { + let parsed = Cli::try_parse_from(["mxgw", "batch"]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn batch_eor_marker_is_stable() { + assert_eq!(super::BATCH_EOR, "__MXGW_BATCH_EOR__"); + } + #[test] fn rfc3339_parser_round_trips_z_and_offset_inputs() { // 2026-04-28T15:30:00Z = 1_777_995_000 (sanity-checked once below) diff --git a/scripts/run-client-e2e-tests.ps1 b/scripts/run-client-e2e-tests.ps1 index 71b8e19..a5e8214 100644 --- a/scripts/run-client-e2e-tests.ps1 +++ b/scripts/run-client-e2e-tests.ps1 @@ -702,7 +702,7 @@ function Get-ClientCommand { } "python" { $arguments = @( - "-m", "mxgateway_cli", $Operation, + "-m", "zb_mom_ww_mxgateway_cli", $Operation, "--endpoint", $hostEndpoint, "--api-key-env", $ApiKeyEnvName, "--plaintext",