diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index 8032d2a..d9ec103 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -111,6 +111,8 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "write-secured2-bulk" => await WriteSecured2BulkAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "bench-read-bulk" => await BenchReadBulkAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) @@ -581,6 +583,237 @@ public static class MxGatewayClientCli cancellationToken); } + /// + /// Cross-language stress benchmark for ReadBulk. Opens its own session, + /// subscribes to N tags so the worker's MxAccessValueCache populates from + /// real OnDataChange events, then hammers ReadBulk in a tight in-process + /// loop with per-call Stopwatch timing. Emits a single JSON object on + /// stdout that the scripts/bench-read-bulk.ps1 driver collates across + /// all five language clients. + /// + private static async Task BenchReadBulkAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + int durationSeconds = arguments.GetInt32("duration-seconds", 30); + int warmupSeconds = arguments.GetInt32("warmup-seconds", 3); + int bulkSize = arguments.GetInt32("bulk-size", 6); + int tagStart = arguments.GetInt32("tag-start", 1); + string tagPrefix = arguments.GetOptional("tag-prefix") ?? "TestMachine_"; + string tagAttribute = arguments.GetOptional("tag-attribute") ?? "TestChangingInt"; + uint timeoutMs = (uint)arguments.GetInt32("timeout-ms", 1500); + string clientName = arguments.GetOptional("client-name") ?? "mxgw-dotnet-bench"; + + string[] tags = new string[bulkSize]; + for (int i = 0; i < bulkSize; i++) + { + // TestMachine_NNN., three-digit machine numbers matching + // the existing e2e tag-discovery convention. + tags[i] = $"{tagPrefix}{(tagStart + i):D3}.{tagAttribute}"; + } + + // Open + register + subscribe-bulk so the cache populates before the + // measurement window opens. + OpenSessionReply openReply = await client.OpenSessionAsync( + new OpenSessionRequest { ClientSessionName = clientName, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + string sessionId = openReply.SessionId; + + try + { + MxCommandReply registerReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = clientName }, + }), + cancellationToken) + .ConfigureAwait(false); + int serverHandle = registerReply.Register?.ServerHandle ?? registerReply.ReturnValue.Int32Value; + + SubscribeBulkCommand subscribe = new() { ServerHandle = serverHandle }; + subscribe.TagAddresses.Add(tags); + MxCommandReply subscribeReply = await InvokeAndEnsureAsync( + client, + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.SubscribeBulk, + SubscribeBulk = subscribe, + }), + cancellationToken) + .ConfigureAwait(false); + int[] itemHandles = subscribeReply.SubscribeBulk?.Results + .Where(r => r.WasSuccessful) + .Select(r => r.ItemHandle) + .ToArray() ?? []; + + // Warm-up: drive the same call shape so the JIT / connection + // pipelines settle before the measurement window opens. + DateTime warmupDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(warmupSeconds); + ReadBulkCommand readBulkCommand = new() + { + ServerHandle = serverHandle, + TimeoutMs = timeoutMs, + }; + readBulkCommand.TagAddresses.Add(tags); + MxCommand readBulkMxCommand = new() { Kind = MxCommandKind.ReadBulk, ReadBulk = readBulkCommand }; + + while (DateTime.UtcNow < warmupDeadline) + { + _ = await client.InvokeAsync( + CreateCommandRequest(sessionId, readBulkMxCommand), + cancellationToken) + .ConfigureAwait(false); + } + + // Steady state — capture per-call wall latency with a high-res + // Stopwatch so the resolution is sub-millisecond on modern Windows. + List latencyMillis = new(capacity: 65536); + long totalReadResults = 0; + long cachedReadResults = 0; + int successfulCalls = 0; + int failedCalls = 0; + DateTime steadyDeadline = DateTime.UtcNow + TimeSpan.FromSeconds(durationSeconds); + DateTime steadyStart = DateTime.UtcNow; + + while (DateTime.UtcNow < steadyDeadline) + { + System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew(); + MxCommandReply reply; + try + { + reply = await client.InvokeAsync( + CreateCommandRequest(sessionId, readBulkMxCommand), + cancellationToken) + .ConfigureAwait(false); + sw.Stop(); + } + catch + { + sw.Stop(); + failedCalls++; + latencyMillis.Add(sw.Elapsed.TotalMilliseconds); + continue; + } + + latencyMillis.Add(sw.Elapsed.TotalMilliseconds); + if (reply.ProtocolStatus?.Code != ProtocolStatusCode.Ok) + { + failedCalls++; + continue; + } + + successfulCalls++; + if (reply.ReadBulk is not null) + { + foreach (BulkReadResult r in reply.ReadBulk.Results) + { + totalReadResults++; + if (r.WasCached) + { + cachedReadResults++; + } + } + } + } + + double steadyElapsedSeconds = (DateTime.UtcNow - steadyStart).TotalSeconds; + + if (itemHandles.Length > 0) + { + UnsubscribeBulkCommand unsubscribe = new() { ServerHandle = serverHandle }; + unsubscribe.ItemHandles.Add(itemHandles); + _ = await client.InvokeAsync( + CreateCommandRequest(sessionId, new MxCommand + { + Kind = MxCommandKind.UnsubscribeBulk, + UnsubscribeBulk = unsubscribe, + }), + cancellationToken) + .ConfigureAwait(false); + } + + int totalCalls = successfulCalls + failedCalls; + double callsPerSecond = steadyElapsedSeconds > 0 + ? totalCalls / steadyElapsedSeconds + : 0; + + object stats = new + { + language = "dotnet", + command = "bench-read-bulk", + endpoint = arguments.GetOptional("endpoint") ?? "(default)", + clientName, + bulkSize, + durationSeconds, + warmupSeconds, + durationMs = (long)(steadyElapsedSeconds * 1000), + tags, + totalCalls, + successfulCalls, + failedCalls, + totalReadResults, + cachedReadResults, + callsPerSecond = Math.Round(callsPerSecond, 2), + latencyMs = new + { + p50 = Percentile(latencyMillis, 0.50), + p95 = Percentile(latencyMillis, 0.95), + p99 = Percentile(latencyMillis, 0.99), + max = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Max(), 3) : 0, + mean = latencyMillis.Count > 0 ? Math.Round(latencyMillis.Average(), 3) : 0, + }, + }; + output.WriteLine(JsonSerializer.Serialize(stats, JsonOptions)); + return 0; + } + finally + { + try + { + await client.CloseSessionAsync( + new CloseSessionRequest { SessionId = sessionId, ClientCorrelationId = CreateCorrelationId() }, + cancellationToken) + .ConfigureAwait(false); + } + catch + { + // Closing the session is best-effort — never let it mask a real bench error. + } + } + } + + /// + /// Computes the requested percentile from an unsorted latency sample using + /// nearest-rank with linear interpolation. Rounds to 3 decimal places to + /// match the JSON schema the PS driver collates. + /// + private static double Percentile(IReadOnlyList sample, double quantile) + { + if (sample.Count == 0) + { + return 0; + } + + double[] sorted = sample.ToArray(); + Array.Sort(sorted); + if (sorted.Length == 1) + { + return Math.Round(sorted[0], 3); + } + + double rank = quantile * (sorted.Length - 1); + int lower = (int)Math.Floor(rank); + int upper = (int)Math.Ceiling(rank); + double fraction = rank - lower; + double value = sorted[lower] + (sorted[upper] - sorted[lower]) * fraction; + return Math.Round(value, 3); + } + /// /// Parses the bulk-write CLI's --values list. All entries share /// the single --type argument; the comma-separated values are @@ -1239,6 +1472,7 @@ public static class MxGatewayClientCli or "write2-bulk" or "write-secured-bulk" or "write-secured2-bulk" + or "bench-read-bulk" or "stream-events" or "write" or "write2" diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index 301d072..e52a408 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -14,6 +14,7 @@ import ( "io" "os" "os/signal" + "sort" "strconv" "strings" "syscall" @@ -99,6 +100,8 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err return runWriteSecuredBulk(ctx, args[1:], stdout, stderr) case "write-secured2-bulk": return runWriteSecured2Bulk(ctx, args[1:], stdout, stderr) + case "bench-read-bulk": + return runBenchReadBulk(ctx, args[1:], stdout, stderr) case "write": return runWrite(ctx, args[1:], stdout, stderr) case "stream-events": @@ -508,6 +511,192 @@ func runWriteBulkVariant(ctx context.Context, args []string, stdout, stderr io.W return writeWriteBulkOutput(stdout, *jsonOutput, command, options, results, err) } +// runBenchReadBulk drives the cross-language ReadBulk stress benchmark from Go: +// opens its own session, subscribes to bulk-size tags so the worker value cache +// populates from real OnDataChange events, runs ReadBulk in a tight loop for +// duration-seconds with per-call timing, and emits the shared JSON schema the +// scripts/bench-read-bulk.ps1 driver collates across all five clients. +func runBenchReadBulk(ctx context.Context, args []string, stdout, stderr io.Writer) error { + flags := flag.NewFlagSet("bench-read-bulk", flag.ContinueOnError) + flags.SetOutput(stderr) + common := bindCommonFlags(flags) + jsonOutput := flags.Bool("json", false, "write JSON output") + clientName := flags.String("client-name", "mxgw-go-bench", "session client name") + durationSeconds := flags.Int("duration-seconds", 30, "steady-state measurement window in seconds") + warmupSeconds := flags.Int("warmup-seconds", 3, "warm-up window before measurement, in seconds") + bulkSize := flags.Int("bulk-size", 6, "tags per ReadBulk call") + tagStart := flags.Int("tag-start", 1, "first machine number") + tagPrefix := flags.String("tag-prefix", "TestMachine_", "tag prefix (machine number appended as %03d)") + tagAttribute := flags.String("tag-attribute", "TestChangingInt", "attribute appended to each tag prefix") + timeoutMs := flags.Int("timeout-ms", 1500, "per-tag snapshot timeout in milliseconds") + + if err := flags.Parse(args); err != nil { + return err + } + if *bulkSize < 1 { + return errors.New("bulk-size must be positive") + } + if *durationSeconds < 1 { + return errors.New("duration-seconds must be positive") + } + + tags := make([]string, *bulkSize) + for i := 0; i < *bulkSize; i++ { + tags[i] = fmt.Sprintf("%s%03d.%s", *tagPrefix, *tagStart+i, *tagAttribute) + } + + client, options, err := dialForCommand(ctx, common) + if err != nil { + return err + } + defer client.Close() + + session, err := client.OpenSession(ctx, mxgateway.OpenSessionOptions{ClientSessionName: *clientName}) + if err != nil { + return err + } + defer func() { + _, _ = session.Close(context.Background()) + }() + + serverHandle, err := session.Register(ctx, *clientName) + if err != nil { + return err + } + + subscribeResults, err := session.SubscribeBulk(ctx, serverHandle, tags) + if err != nil { + return err + } + itemHandles := make([]int32, 0, len(subscribeResults)) + for _, result := range subscribeResults { + if result.GetWasSuccessful() { + itemHandles = append(itemHandles, result.GetItemHandle()) + } + } + defer func() { + if len(itemHandles) > 0 { + _, _ = session.UnsubscribeBulk(context.Background(), serverHandle, itemHandles) + } + }() + + // Warm-up: drive identical calls so any first-call JIT / connection-pool + // setup is amortised before the measurement window opens. + warmupDeadline := time.Now().Add(time.Duration(*warmupSeconds) * time.Second) + timeout := time.Duration(*timeoutMs) * time.Millisecond + for time.Now().Before(warmupDeadline) { + _, _ = session.ReadBulk(ctx, serverHandle, tags, timeout) + } + + // Steady state: per-call latency captured via time.Now() deltas. + latenciesMs := make([]float64, 0, 65536) + var totalReadResults int64 + var cachedReadResults int64 + var successfulCalls, failedCalls int + steadyStart := time.Now() + steadyDeadline := steadyStart.Add(time.Duration(*durationSeconds) * time.Second) + + for time.Now().Before(steadyDeadline) { + callStart := time.Now() + results, err := session.ReadBulk(ctx, serverHandle, tags, timeout) + elapsed := time.Since(callStart) + latenciesMs = append(latenciesMs, float64(elapsed.Nanoseconds())/1e6) + if err != nil { + failedCalls++ + continue + } + successfulCalls++ + for _, r := range results { + totalReadResults++ + if r.GetWasCached() { + cachedReadResults++ + } + } + } + steadyElapsed := time.Since(steadyStart) + totalCalls := successfulCalls + failedCalls + + callsPerSecond := 0.0 + if steadyElapsed.Seconds() > 0 { + callsPerSecond = float64(totalCalls) / steadyElapsed.Seconds() + } + + stats := map[string]any{ + "language": "go", + "command": "bench-read-bulk", + "endpoint": options.Endpoint, + "clientName": *clientName, + "bulkSize": *bulkSize, + "durationSeconds": *durationSeconds, + "warmupSeconds": *warmupSeconds, + "durationMs": steadyElapsed.Milliseconds(), + "tags": tags, + "totalCalls": totalCalls, + "successfulCalls": successfulCalls, + "failedCalls": failedCalls, + "totalReadResults": totalReadResults, + "cachedReadResults": cachedReadResults, + "callsPerSecond": roundTo(callsPerSecond, 2), + "latencyMs": percentileSummary(latenciesMs), + } + if *jsonOutput { + return writeJSON(stdout, stats) + } + fmt.Fprintln(stdout, callsPerSecond) + return nil +} + +// percentileSummary returns the same { p50, p95, p99, max, mean } shape every +// language bench emits, rounded to 3 decimal places so the PowerShell driver +// sees one schema across all five clients. +func percentileSummary(sample []float64) map[string]float64 { + if len(sample) == 0 { + return map[string]float64{"p50": 0, "p95": 0, "p99": 0, "max": 0, "mean": 0} + } + sorted := append([]float64(nil), sample...) + sort.Float64s(sorted) + mean := 0.0 + max := sorted[len(sorted)-1] + for _, v := range sample { + mean += v + } + mean /= float64(len(sample)) + return map[string]float64{ + "p50": roundTo(percentile(sorted, 0.50), 3), + "p95": roundTo(percentile(sorted, 0.95), 3), + "p99": roundTo(percentile(sorted, 0.99), 3), + "max": roundTo(max, 3), + "mean": roundTo(mean, 3), + } +} + +// percentile uses nearest-rank with linear interpolation; matches the .NET +// implementation so cross-language comparisons are apples-to-apples. +func percentile(sorted []float64, quantile float64) float64 { + if len(sorted) == 0 { + return 0 + } + if len(sorted) == 1 { + return sorted[0] + } + rank := quantile * float64(len(sorted)-1) + lower := int(rank) + upper := lower + 1 + if upper >= len(sorted) { + return sorted[lower] + } + fraction := rank - float64(lower) + return sorted[lower] + (sorted[upper]-sorted[lower])*fraction +} + +func roundTo(value float64, digits int) float64 { + shift := 1.0 + for i := 0; i < digits; i++ { + shift *= 10 + } + return float64(int64(value*shift+0.5)) / shift +} + // parseRfc3339Timestamp parses an RFC 3339 timestamp and returns the // MxValue protobuf representation used for the timestamped write families. func parseRfc3339Timestamp(text string) (*mxgateway.MxValue, error) { diff --git a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java index b3d47be..ca28816 100644 --- a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java @@ -120,6 +120,7 @@ public final class MxGatewayCli implements Callable { commandLine.addSubcommand("write2-bulk", new Write2BulkCommand(clientFactory)); commandLine.addSubcommand("write-secured-bulk", new WriteSecuredBulkCommand(clientFactory)); commandLine.addSubcommand("write-secured2-bulk", new WriteSecured2BulkCommand(clientFactory)); + commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory)); commandLine.addSubcommand("write", new WriteCommand(clientFactory)); commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory)); commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory)); @@ -769,6 +770,224 @@ public final class MxGatewayCli implements Callable { } } + /** + * Cross-language ReadBulk stress benchmark — mirrors the .NET / Go / Rust / + * Python implementations so the PS driver collates one JSON schema across + * all five clients. + */ + @Command(name = "bench-read-bulk", description = "Cross-language ReadBulk stress benchmark.") + static final class BenchReadBulkCommand extends GatewayCommand { + @Option(names = "--client-name", defaultValue = "mxgw-java-bench") + String clientName; + + @Option(names = "--duration-seconds", defaultValue = "30") + int durationSeconds; + + @Option(names = "--warmup-seconds", defaultValue = "3") + int warmupSeconds; + + @Option(names = "--bulk-size", defaultValue = "6") + int bulkSize; + + @Option(names = "--tag-start", defaultValue = "1") + int tagStart; + + @Option(names = "--tag-prefix", defaultValue = "TestMachine_") + String tagPrefix; + + @Option(names = "--tag-attribute", defaultValue = "TestChangingInt") + String tagAttribute; + + @Option(names = "--timeout-ms", defaultValue = "1500") + int timeoutMs; + + BenchReadBulkCommand(MxGatewayCliClientFactory clientFactory) { + super(clientFactory); + } + + @Override + public Integer call() { + if (bulkSize < 1) { + throw new IllegalArgumentException("bulk-size must be positive"); + } + List tags = new ArrayList<>(bulkSize); + for (int i = 0; i < bulkSize; i++) { + tags.add(String.format("%s%03d.%s", tagPrefix, tagStart + i, tagAttribute)); + } + + try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) { + var openReply = client.openSession( + mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest.newBuilder() + .setClientSessionName(clientName) + .build()); + String sessionId = openReply.getSessionId(); + MxGatewayCliSession session = client.session(sessionId); + List itemHandles = new ArrayList<>(); + long steadyElapsedNanos; + long[] latenciesNanos; + int latencyCount = 0; + long successful = 0; + long failed = 0; + long totalResults = 0; + long cachedResults = 0; + int serverHandle = session.register(clientName); + try { + List subscribeResults = session.subscribeBulk(serverHandle, tags); + for (SubscribeResult r : subscribeResults) { + if (r.getWasSuccessful()) { + itemHandles.add(r.getItemHandle()); + } + } + + // Warm-up window — drives identical calls so JIT / connection + // pool effects are amortised before the measurement window. + long warmupDeadline = System.nanoTime() + warmupSeconds * 1_000_000_000L; + while (System.nanoTime() < warmupDeadline) { + session.readBulk(serverHandle, tags, timeoutMs); + } + + latenciesNanos = new long[Math.max(1024, durationSeconds * 1000)]; + long steadyStart = System.nanoTime(); + long steadyDeadline = steadyStart + durationSeconds * 1_000_000_000L; + while (System.nanoTime() < steadyDeadline) { + long callStart = System.nanoTime(); + try { + List results = session.readBulk(serverHandle, tags, timeoutMs); + long elapsed = System.nanoTime() - callStart; + if (latencyCount >= latenciesNanos.length) { + long[] grown = new long[latenciesNanos.length * 2]; + System.arraycopy(latenciesNanos, 0, grown, 0, latencyCount); + latenciesNanos = grown; + } + latenciesNanos[latencyCount++] = elapsed; + successful++; + for (BulkReadResult r : results) { + totalResults++; + if (r.getWasCached()) { + cachedResults++; + } + } + } catch (Exception ex) { + long elapsed = System.nanoTime() - callStart; + if (latencyCount >= latenciesNanos.length) { + long[] grown = new long[latenciesNanos.length * 2]; + System.arraycopy(latenciesNanos, 0, grown, 0, latencyCount); + latenciesNanos = grown; + } + latenciesNanos[latencyCount++] = elapsed; + failed++; + } + } + steadyElapsedNanos = System.nanoTime() - steadyStart; + } finally { + if (!itemHandles.isEmpty()) { + try { session.unsubscribeBulk(serverHandle, itemHandles); } catch (Exception ignored) { } + } + try { client.closeSession(mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest.newBuilder() + .setSessionId(sessionId).build()); } catch (Exception ignored) { } + } + + long totalCalls = successful + failed; + double steadyElapsedSeconds = steadyElapsedNanos / 1_000_000_000.0; + double callsPerSecond = steadyElapsedSeconds > 0 ? totalCalls / steadyElapsedSeconds : 0.0; + writeBenchOutput(common, json, tags, clientName, bulkSize, durationSeconds, warmupSeconds, + steadyElapsedNanos, totalCalls, successful, failed, totalResults, cachedResults, + callsPerSecond, latenciesNanos, latencyCount); + } + return 0; + } + } + + private static void writeBenchOutput( + CommonOptions common, + boolean json, + List tags, + String clientName, + int bulkSize, + int durationSeconds, + int warmupSeconds, + long steadyElapsedNanos, + long totalCalls, + long successful, + long failed, + long totalResults, + long cachedResults, + double callsPerSecond, + long[] latenciesNanos, + int latencyCount) { + PrintWriter out = common.spec.commandLine().getOut(); + Map latencyMs = percentileSummaryMs(latenciesNanos, latencyCount); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("language", "java"); + output.put("command", "bench-read-bulk"); + output.put("endpoint", common.endpoint); + output.put("clientName", clientName); + output.put("bulkSize", bulkSize); + output.put("durationSeconds", durationSeconds); + output.put("warmupSeconds", warmupSeconds); + output.put("durationMs", steadyElapsedNanos / 1_000_000L); + output.put("tags", tags); + output.put("totalCalls", totalCalls); + output.put("successfulCalls", successful); + output.put("failedCalls", failed); + output.put("totalReadResults", totalResults); + output.put("cachedReadResults", cachedResults); + output.put("callsPerSecond", roundTo(callsPerSecond, 2)); + output.put("latencyMs", latencyMs); + out.println(jsonObject(output)); + return; + } + out.println(callsPerSecond); + } + + private static Map percentileSummaryMs(long[] latenciesNanos, int count) { + Map result = new LinkedHashMap<>(); + if (count == 0) { + result.put("p50", 0.0); + result.put("p95", 0.0); + result.put("p99", 0.0); + result.put("max", 0.0); + result.put("mean", 0.0); + return result; + } + long[] sorted = new long[count]; + System.arraycopy(latenciesNanos, 0, sorted, 0, count); + java.util.Arrays.sort(sorted); + double sumMs = 0.0; + for (int i = 0; i < count; i++) { + sumMs += sorted[i] / 1_000_000.0; + } + result.put("p50", roundTo(percentileMs(sorted, 0.50), 3)); + result.put("p95", roundTo(percentileMs(sorted, 0.95), 3)); + result.put("p99", roundTo(percentileMs(sorted, 0.99), 3)); + result.put("max", roundTo(sorted[count - 1] / 1_000_000.0, 3)); + result.put("mean", roundTo(sumMs / count, 3)); + return result; + } + + private static double percentileMs(long[] sorted, double quantile) { + int n = sorted.length; + if (n == 0) { + return 0.0; + } + if (n == 1) { + return sorted[0] / 1_000_000.0; + } + double rank = quantile * (n - 1); + int lower = (int) Math.floor(rank); + int upper = Math.min(lower + 1, n - 1); + double fraction = rank - lower; + double lowerMs = sorted[lower] / 1_000_000.0; + double upperMs = sorted[upper] / 1_000_000.0; + return lowerMs + (upperMs - lowerMs) * fraction; + } + + private static double roundTo(double value, int digits) { + double shift = Math.pow(10, digits); + return Math.round(value * shift) / shift; + } + @Command(name = "write", description = "Invokes MXAccess Write.") static final class WriteCommand extends GatewayCommand { @Option(names = "--session-id", required = true, description = "Gateway session id.") diff --git a/clients/python/src/mxgateway_cli/commands.py b/clients/python/src/mxgateway_cli/commands.py index 1c53087..124f74a 100644 --- a/clients/python/src/mxgateway_cli/commands.py +++ b/clients/python/src/mxgateway_cli/commands.py @@ -270,6 +270,29 @@ def write_secured2_bulk(**kwargs: Any) -> None: _run(_write_secured2_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) +@main.command("bench-read-bulk") +@gateway_options +@click.option("--client-name", default="mxgw-python-bench", show_default=True) +@click.option("--duration-seconds", default=30, type=int, show_default=True) +@click.option("--warmup-seconds", default=3, type=int, show_default=True) +@click.option("--bulk-size", default=6, type=int, show_default=True) +@click.option("--tag-start", default=1, type=int, show_default=True) +@click.option("--tag-prefix", default="TestMachine_", show_default=True) +@click.option("--tag-attribute", default="TestChangingInt", show_default=True) +@click.option("--timeout-ms", default=1500, type=int, show_default=True) +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def bench_read_bulk(**kwargs: Any) -> None: + """Cross-language ReadBulk stress benchmark. + + Opens its own session, subscribes to bulk-size tags so the worker value + cache populates from real OnDataChange events, runs ReadBulk in a tight + loop for duration-seconds, and emits the shared JSON stats schema the + scripts/bench-read-bulk.ps1 driver collates across all five clients. + """ + + _run(_bench_read_bulk(**kwargs), output_json=kwargs["output_json"], secrets=_secrets(kwargs)) + + @main.command("stream-events") @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -538,6 +561,119 @@ async def _write_secured2_bulk(**kwargs: Any) -> dict[str, Any]: return {"results": [_message_dict(result) for result in results]} +async def _bench_read_bulk(**kwargs: Any) -> dict[str, Any]: + """ReadBulk stress benchmark — matches the .NET / Go / Rust / Java schema.""" + import time + + bulk_size = int(kwargs["bulk_size"]) + if bulk_size < 1: + raise click.UsageError("bulk-size must be positive") + duration_seconds = int(kwargs["duration_seconds"]) + warmup_seconds = int(kwargs["warmup_seconds"]) + tag_start = int(kwargs["tag_start"]) + tag_prefix = kwargs["tag_prefix"] + tag_attribute = kwargs["tag_attribute"] + timeout_ms = int(kwargs["timeout_ms"]) + client_name = kwargs["client_name"] + tags = [f"{tag_prefix}{i:03d}.{tag_attribute}" for i in range(tag_start, tag_start + bulk_size)] + + async with await _connect(kwargs) as client: + session = await client.open_session(client_session_name=client_name) + server_handle = 0 + item_handles: list[int] = [] + try: + server_handle = await session.register(client_name) + subscribe_results = await session.subscribe_bulk(server_handle, tags) + item_handles = [r.item_handle for r in subscribe_results if r.was_successful] + + # Warm-up window so JIT / connection pool / first-call costs are + # amortised before the measurement window opens. + warmup_deadline = time.perf_counter() + warmup_seconds + while time.perf_counter() < warmup_deadline: + await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + + latencies_ms: list[float] = [] + total_results = 0 + cached_results = 0 + successful = 0 + failed = 0 + steady_start = time.perf_counter() + steady_deadline = steady_start + duration_seconds + while time.perf_counter() < steady_deadline: + call_start = time.perf_counter() + try: + results = await session.read_bulk(server_handle, tags, timeout_ms=timeout_ms) + except Exception: + failed += 1 + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + continue + latencies_ms.append((time.perf_counter() - call_start) * 1000.0) + successful += 1 + for r in results: + total_results += 1 + if r.was_cached: + cached_results += 1 + steady_elapsed = time.perf_counter() - steady_start + total_calls = successful + failed + calls_per_second = total_calls / steady_elapsed if steady_elapsed > 0 else 0.0 + finally: + if item_handles: + try: + await session.unsubscribe_bulk(server_handle, item_handles) + except Exception: + pass + try: + await session.close() + except Exception: + pass + + return { + "language": "python", + "command": "bench-read-bulk", + "endpoint": kwargs.get("endpoint"), + "clientName": client_name, + "bulkSize": bulk_size, + "durationSeconds": duration_seconds, + "warmupSeconds": warmup_seconds, + "durationMs": int(steady_elapsed * 1000), + "tags": tags, + "totalCalls": total_calls, + "successfulCalls": successful, + "failedCalls": failed, + "totalReadResults": total_results, + "cachedReadResults": cached_results, + "callsPerSecond": round(calls_per_second, 2), + "latencyMs": _percentile_summary(latencies_ms), + } + + +def _percentile_summary(sample: list[float]) -> dict[str, float]: + if not sample: + return {"p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0} + sorted_sample = sorted(sample) + return { + "p50": round(_percentile(sorted_sample, 0.50), 3), + "p95": round(_percentile(sorted_sample, 0.95), 3), + "p99": round(_percentile(sorted_sample, 0.99), 3), + "max": round(sorted_sample[-1], 3), + "mean": round(sum(sample) / len(sample), 3), + } + + +def _percentile(sorted_sample: list[float], quantile: float) -> float: + """Nearest-rank with linear interpolation; matches every other client.""" + n = len(sorted_sample) + if n == 0: + return 0.0 + if n == 1: + return sorted_sample[0] + rank = quantile * (n - 1) + lower = int(rank) + upper = min(lower + 1, n - 1) + fraction = rank - lower + return sorted_sample[lower] + (sorted_sample[upper] - sorted_sample[lower]) * fraction + + async def _stream_events(**kwargs: Any) -> dict[str, Any]: async with await _connect(kwargs) as client: session = _session(client, kwargs["session_id"]) diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index acc9934..21e153b 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -231,6 +231,32 @@ enum Command { #[arg(long)] json: bool, }, + /// Cross-language ReadBulk stress benchmark. Opens its own session, + /// subscribes to bulk-size tags, then hammers ReadBulk in a tight loop + /// for duration-seconds and emits a JSON stats record the + /// scripts/bench-read-bulk.ps1 driver collates across all five clients. + BenchReadBulk { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long, default_value = "mxgw-rust-bench")] + client_name: String, + #[arg(long, default_value_t = 30)] + duration_seconds: u64, + #[arg(long, default_value_t = 3)] + warmup_seconds: u64, + #[arg(long, default_value_t = 6)] + bulk_size: usize, + #[arg(long, default_value_t = 1)] + tag_start: usize, + #[arg(long, default_value = "TestMachine_")] + tag_prefix: String, + #[arg(long, default_value = "TestChangingInt")] + tag_attribute: String, + #[arg(long, default_value_t = 1500)] + timeout_ms: u32, + #[arg(long)] + json: bool, + }, StreamEvents { #[command(flatten)] connection: ConnectionArgs, @@ -663,6 +689,38 @@ async fn run(cli: Cli) -> Result<(), Error> { .await?; print_write_bulk_results("write-secured2-bulk", &results, json); } + Command::BenchReadBulk { + connection, + client_name, + duration_seconds, + warmup_seconds, + bulk_size, + tag_start, + tag_prefix, + tag_attribute, + timeout_ms, + json, + } => { + if bulk_size == 0 { + return Err(Error::InvalidArgument { + name: "bulk-size".to_owned(), + detail: "must be positive".to_owned(), + }); + } + run_bench_read_bulk( + connection, + client_name, + duration_seconds, + warmup_seconds, + bulk_size, + tag_start, + tag_prefix, + tag_attribute, + timeout_ms, + json, + ) + .await?; + } Command::StreamEvents { connection, session_id, @@ -936,6 +994,161 @@ async fn session_for( Ok(client.session(session_id)) } +/// Cross-language ReadBulk stress benchmark — mirrors the .NET / Go / Python / +/// Java implementations so the PS driver collates one JSON schema across all +/// five clients. +#[allow(clippy::too_many_arguments)] +async fn run_bench_read_bulk( + connection: ConnectionArgs, + client_name: String, + duration_seconds: u64, + warmup_seconds: u64, + bulk_size: usize, + tag_start: usize, + tag_prefix: String, + tag_attribute: String, + timeout_ms: u32, + use_json: bool, +) -> Result<(), Error> { + let endpoint = connection.endpoint.clone(); + let client = connect(connection).await?; + let session = client + .open_session(OpenSessionRequest { + client_session_name: client_name.clone(), + ..OpenSessionRequest::default() + }) + .await?; + + let tags: Vec = (0..bulk_size) + .map(|i| format!("{tag_prefix}{:03}.{tag_attribute}", tag_start + i)) + .collect(); + + // Bench body in its own block so the trailing session.close() always + // runs, even on the early returns the loop body never hits today. + let bench_outcome = async { + let server_handle = session.register(&client_name).await?; + let subscribe_results = session.subscribe_bulk(server_handle, tags.clone()).await?; + let item_handles: Vec = subscribe_results + .iter() + .filter(|r| r.was_successful) + .map(|r| r.item_handle) + .collect(); + + let warmup_deadline = std::time::Instant::now() + + std::time::Duration::from_secs(warmup_seconds); + while std::time::Instant::now() < warmup_deadline { + let _ = session + .read_bulk(server_handle, tags.clone(), timeout_ms) + .await; + } + + let mut latencies_ms: Vec = Vec::with_capacity(65_536); + let mut total_read_results: u64 = 0; + let mut cached_read_results: u64 = 0; + let mut successful_calls: u64 = 0; + let mut failed_calls: u64 = 0; + let steady_start = std::time::Instant::now(); + let steady_deadline = steady_start + std::time::Duration::from_secs(duration_seconds); + + while std::time::Instant::now() < steady_deadline { + let call_start = std::time::Instant::now(); + let outcome = session.read_bulk(server_handle, tags.clone(), timeout_ms).await; + let elapsed_ms = call_start.elapsed().as_secs_f64() * 1000.0; + latencies_ms.push(elapsed_ms); + match outcome { + Ok(results) => { + successful_calls += 1; + for r in &results { + total_read_results += 1; + if r.was_cached { + cached_read_results += 1; + } + } + } + Err(_) => failed_calls += 1, + } + } + let steady_elapsed = steady_start.elapsed(); + + if !item_handles.is_empty() { + let _ = session.unsubscribe_bulk(server_handle, item_handles).await; + } + + let total_calls = successful_calls + failed_calls; + let calls_per_second = if steady_elapsed.as_secs_f64() > 0.0 { + total_calls as f64 / steady_elapsed.as_secs_f64() + } else { + 0.0 + }; + + let summary = percentile_summary(&latencies_ms); + let stats = serde_json::json!({ + "language": "rust", + "command": "bench-read-bulk", + "endpoint": endpoint, + "clientName": client_name, + "bulkSize": bulk_size, + "durationSeconds": duration_seconds, + "warmupSeconds": warmup_seconds, + "durationMs": steady_elapsed.as_millis() as u64, + "tags": tags, + "totalCalls": total_calls, + "successfulCalls": successful_calls, + "failedCalls": failed_calls, + "totalReadResults": total_read_results, + "cachedReadResults": cached_read_results, + "callsPerSecond": round_to(calls_per_second, 2), + "latencyMs": summary, + }); + if use_json { + println!("{}", stats); + } else { + println!("{calls_per_second}"); + } + Ok::<(), Error>(()) + } + .await; + + let _ = session.close().await; + bench_outcome +} + +fn percentile_summary(sample: &[f64]) -> serde_json::Value { + if sample.is_empty() { + return serde_json::json!({ "p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0 }); + } + let mut sorted = sample.to_vec(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let max = sorted[sorted.len() - 1]; + let mean = sample.iter().sum::() / sample.len() as f64; + serde_json::json!({ + "p50": round_to(percentile(&sorted, 0.50), 3), + "p95": round_to(percentile(&sorted, 0.95), 3), + "p99": round_to(percentile(&sorted, 0.99), 3), + "max": round_to(max, 3), + "mean": round_to(mean, 3), + }) +} + +fn percentile(sorted: &[f64], quantile: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + if sorted.len() == 1 { + return sorted[0]; + } + let rank = quantile * (sorted.len() - 1) as f64; + let lower = rank.floor() as usize; + let upper = (lower + 1).min(sorted.len() - 1); + let fraction = rank - lower as f64; + sorted[lower] + (sorted[upper] - sorted[lower]) * fraction +} + +fn round_to(value: f64, digits: u32) -> f64 { + let shift = 10f64.powi(digits as i32); + (value * shift).round() / shift +} + fn print_version(use_json: bool) { if use_json { println!("{}", version_json()); diff --git a/scripts/bench-read-bulk.ps1 b/scripts/bench-read-bulk.ps1 new file mode 100644 index 0000000..7aa9cdc --- /dev/null +++ b/scripts/bench-read-bulk.ps1 @@ -0,0 +1,379 @@ +<# +.SYNOPSIS +Cross-language ReadBulk stress benchmark driver. + +.DESCRIPTION +Launches the bench-read-bulk subcommand of every client CLI (.NET, Go, Rust, +Python, Java) concurrently against a running gateway and worker. Each client +opens its own session, subscribes to -BulkSize tags so the worker's per-session +MxAccessValueCache populates from real OnDataChange events, then hammers +ReadBulk in a tight in-process loop for -DurationSeconds with per-call +high-resolution latency capture. Each emits a single JSON stats object on +stdout; this script collates the five into a comparison table. + +The gateway and worker are assumed to be running at -Endpoint with the API +key in $env:. + +.PARAMETER Clients +Which clients to run. Defaults to all five. + +.PARAMETER Endpoint +gRPC endpoint of the gateway. Default localhost:5120. + +.PARAMETER ApiKeyEnv +Environment variable holding the API key. Default MXGATEWAY_API_KEY. + +.PARAMETER DurationSeconds +Steady-state measurement window per client. + +.PARAMETER WarmupSeconds +Warm-up window per client (calls during this window are discarded). + +.PARAMETER BulkSize +Number of tags per ReadBulk call. + +.PARAMETER TagStart +First machine number per client. Each client uses a contiguous range starting +here, so machine ranges do not overlap when -DistinctTags is set. + +.PARAMETER TagPrefix +Tag prefix (machine number is appended as %03d). + +.PARAMETER TagAttribute +Attribute appended to each tag. + +.PARAMETER DistinctTags +When set, each client uses its own slice of tags (clients[i] starts at +TagStart + i * BulkSize). When unset (default), all clients hit the same +tags to maximise contention on the worker's value cache. + +.PARAMETER ReportPath +Where to persist the combined report. Defaults to artifacts/bench/... +#> +[CmdletBinding()] +param( + [string[]]$Clients = @("dotnet", "go", "rust", "python", "java"), + [string]$Endpoint = "localhost:5120", + [string]$ApiKeyEnv = "MXGATEWAY_API_KEY", + [int]$DurationSeconds = 30, + [int]$WarmupSeconds = 3, + [int]$BulkSize = 6, + [int]$TagStart = 1, + [string]$TagPrefix = "TestMachine_", + [string]$TagAttribute = "TestChangingInt", + [int]$TimeoutMs = 1500, + [switch]$DistinctTags, + [string]$ReportPath +) + +Set-StrictMode -Version Latest +$ErrorActionPreference = "Stop" + +$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..") +$validClients = @("dotnet", "go", "rust", "python", "java") +foreach ($c in $Clients) { + if ($validClients -notcontains $c) { + throw "Unsupported client '$c'. Valid: $($validClients -join ', ')." + } +} + +if ([string]::IsNullOrWhiteSpace($ReportPath)) { + $timestamp = Get-Date -Format "yyyyMMdd-HHmmss" + $ReportPath = Join-Path $repoRoot "artifacts/bench/bench-read-bulk-$timestamp.json" +} +$reportDir = Split-Path -Parent $ReportPath +if (-not (Test-Path $reportDir)) { + New-Item -ItemType Directory -Path $reportDir -Force | Out-Null +} + +$apiKeyValue = (Get-Item -Path "Env:$ApiKeyEnv" -ErrorAction SilentlyContinue).Value +if ([string]::IsNullOrWhiteSpace($apiKeyValue)) { + throw "The API key environment variable '$ApiKeyEnv' is not set. Define it before running the bench." +} + +# Temp dir for per-client stdout/stderr capture + (Java only) a one-shot +# wrapper .bat that handles cmd.exe's quoting rules for `gradle --args="..."`. +$tmpDir = Join-Path ([System.IO.Path]::GetTempPath()) "mxgw-bench-$([guid]::NewGuid())" +New-Item -ItemType Directory -Path $tmpDir -Force | Out-Null + +function ConvertTo-HttpEndpoint { + param([string]$Value) + if ($Value -match '^https?://') { return $Value } + return "http://$Value" +} + +function ConvertTo-HostEndpoint { + param([string]$Value) + return ($Value -replace '^https?://', '') +} + +# Build the per-client command array. Each client gets its own tag range when +# -DistinctTags is set so the workers race against distinct cache slices. +function Get-ClientCommand { + param( + [string]$Client, + [int]$ClientIndex + ) + + $effectiveTagStart = if ($DistinctTags) { $TagStart + ($ClientIndex * $BulkSize) } else { $TagStart } + $httpEndpoint = ConvertTo-HttpEndpoint -Value $Endpoint + $hostEndpoint = ConvertTo-HostEndpoint -Value $Endpoint + $clientName = "mxgw-$Client-bench" + + # Per-call gRPC timeout must exceed (DurationSeconds + WarmupSeconds + slack) + # — otherwise the channel-wide timeout cancels the bench mid-loop. + $callTimeoutSeconds = [int]([Math]::Max(60, $DurationSeconds + $WarmupSeconds + 30)) + + switch ($Client) { + "dotnet" { + $cliArgs = @( + "run", "--project", "clients/dotnet/MxGateway.Client.Cli", "--no-build", "--", + "bench-read-bulk", + "--endpoint", $httpEndpoint, + "--api-key-env", $ApiKeyEnv, + "--timeout", "${callTimeoutSeconds}s", + "--client-name", $clientName, + "--duration-seconds", "$DurationSeconds", + "--warmup-seconds", "$WarmupSeconds", + "--bulk-size", "$BulkSize", + "--tag-start", "$effectiveTagStart", + "--tag-prefix", $TagPrefix, + "--tag-attribute", $TagAttribute, + "--timeout-ms", "$TimeoutMs", + "--json" + ) + return [pscustomobject]@{ file = "dotnet"; args = $cliArgs; cwd = $repoRoot } + } + "go" { + $cliArgs = @( + "run", "./cmd/mxgw-go", "bench-read-bulk", + "-endpoint", $hostEndpoint, + "-api-key-env", $ApiKeyEnv, + "-plaintext", + "-json", + "-client-name", $clientName, + "-duration-seconds", "$DurationSeconds", + "-warmup-seconds", "$WarmupSeconds", + "-bulk-size", "$BulkSize", + "-tag-start", "$effectiveTagStart", + "-tag-prefix", $TagPrefix, + "-tag-attribute", $TagAttribute, + "-timeout-ms", "$TimeoutMs" + ) + return [pscustomobject]@{ file = "go"; args = $cliArgs; cwd = (Join-Path $repoRoot "clients/go") } + } + "rust" { + $cliArgs = @( + "run", "--quiet", "-p", "mxgw-cli", "--", + "bench-read-bulk", + "--endpoint", $httpEndpoint, + "--api-key-env", $ApiKeyEnv, + "--client-name", $clientName, + "--duration-seconds", "$DurationSeconds", + "--warmup-seconds", "$WarmupSeconds", + "--bulk-size", "$BulkSize", + "--tag-start", "$effectiveTagStart", + "--tag-prefix", $TagPrefix, + "--tag-attribute", $TagAttribute, + "--timeout-ms", "$TimeoutMs", + "--json" + ) + return [pscustomobject]@{ file = "cargo"; args = $cliArgs; cwd = (Join-Path $repoRoot "clients/rust") } + } + "python" { + $cliArgs = @( + "-m", "mxgateway_cli", "bench-read-bulk", + "--endpoint", $hostEndpoint, + "--api-key-env", $ApiKeyEnv, + "--plaintext", + "--client-name", $clientName, + "--duration-seconds", "$DurationSeconds", + "--warmup-seconds", "$WarmupSeconds", + "--bulk-size", "$BulkSize", + "--tag-start", "$effectiveTagStart", + "--tag-prefix", $TagPrefix, + "--tag-attribute", $TagAttribute, + "--timeout-ms", "$TimeoutMs", + "--json" + ) + $python = 'C:\Users\dohertj2\AppData\Local\Programs\Python\Python312\python.exe' + return [pscustomobject]@{ file = $python; args = $cliArgs; cwd = (Join-Path $repoRoot "clients/python"); pythonpath = (Join-Path $repoRoot "clients/python/src") } + } + "java" { + $inner = @( + "bench-read-bulk", + "--endpoint", $hostEndpoint, + "--api-key-env", $ApiKeyEnv, + "--plaintext", + "--json", + "--client-name", $clientName, + "--duration-seconds", "$DurationSeconds", + "--warmup-seconds", "$WarmupSeconds", + "--bulk-size", "$BulkSize", + "--tag-start", "$effectiveTagStart", + "--tag-prefix", $TagPrefix, + "--tag-attribute", $TagAttribute, + "--timeout-ms", "$TimeoutMs" + ) + $gradle = (Get-Command "gradle.bat", "gradle.cmd", "gradle.exe", "gradle" -ErrorAction SilentlyContinue | Select-Object -First 1) + if ($null -eq $gradle) { throw "gradle not on PATH; required for the Java bench." } + # Start-Process with ArgumentList mangles the `--args="..."` quoting + # cmd.exe needs to keep the whole bench-args expression as a single + # gradle argument. Workaround: write a one-shot .bat that contains + # the literal gradle command line and invoke that batch via cmd. + $batPath = Join-Path $tmpDir "java-bench.bat" + $batContent = '@echo off' + "`r`n" + + '"' + $gradle.Source + '" --quiet :mxgateway-cli:run "--args=' + ($inner -join ' ') + '"' + "`r`n" + Set-Content -Path $batPath -Value $batContent -Encoding ASCII + return [pscustomobject]@{ file = "cmd.exe"; args = @("/c", $batPath); cwd = (Join-Path $repoRoot "clients/java") } + } + } +} + +# Start one detached process per client and wait for all. Stdout (the JSON +# stats line) is captured to a per-client tmp file; stderr is captured too in +# case a bench crashed. +$jobs = @() + +Write-Host "Launching $($Clients.Count) concurrent benches against $Endpoint (duration=$($DurationSeconds)s, warmup=$($WarmupSeconds)s, bulkSize=$BulkSize, distinctTags=$([bool]$DistinctTags))" + +for ($i = 0; $i -lt $Clients.Count; $i++) { + $client = $Clients[$i] + $cmd = Get-ClientCommand -Client $client -ClientIndex $i + $stdoutPath = Join-Path $tmpDir "$client.out" + $stderrPath = Join-Path $tmpDir "$client.err" + $startArgs = @{ + FilePath = $cmd.file + ArgumentList = $cmd.args + WorkingDirectory = $cmd.cwd + RedirectStandardOutput = $stdoutPath + RedirectStandardError = $stderrPath + NoNewWindow = $true + PassThru = $true + } + if ($cmd.PSObject.Properties['pythonpath']) { + # Python needs PYTHONPATH so the editable mxgateway_cli module resolves. + $env:PYTHONPATH = $cmd.pythonpath + } + $process = Start-Process @startArgs + $jobs += [pscustomobject]@{ client = $client; process = $process; stdoutPath = $stdoutPath; stderrPath = $stderrPath } + Write-Host " [$client] pid=$($process.Id)" +} + +foreach ($job in $jobs) { + $job.process.WaitForExit() +} + +# Parse one JSON line per client. The line is typically the last +# `{`-prefixed line in stdout (gradle, dotnet run, cargo run can emit log +# noise before it). +function Get-JsonStats { + param([string]$Path) + if (-not (Test-Path $Path)) { return $null } + $content = Get-Content -Path $Path -Raw + if ([string]::IsNullOrWhiteSpace($content)) { return $null } + + # Scan from the LAST top-level `{` (the bench JSON is the final structured + # output line; earlier text may be log noise from `dotnet run` / `cargo + # run` / `gradle :run`). Walk forward counting braces to locate the + # matching `}` so nested objects like `latencyMs` don't confuse the parser. + $startIndex = -1 + $depth = 0 + for ($i = $content.Length - 1; $i -ge 0; $i--) { + $ch = $content[$i] + if ($ch -eq '}') { $depth++ } + elseif ($ch -eq '{') { + $depth-- + if ($depth -eq 0) { $startIndex = $i; break } + } + } + if ($startIndex -lt 0) { return $null } + + $endIndex = -1 + $depth = 0 + for ($i = $startIndex; $i -lt $content.Length; $i++) { + $ch = $content[$i] + if ($ch -eq '{') { $depth++ } + elseif ($ch -eq '}') { + $depth-- + if ($depth -eq 0) { $endIndex = $i; break } + } + } + if ($endIndex -lt 0) { return $null } + + $json = $content.Substring($startIndex, $endIndex - $startIndex + 1) + try { return $json | ConvertFrom-Json } + catch { return $null } +} + +$results = @() +foreach ($job in $jobs) { + $stats = Get-JsonStats -Path $job.stdoutPath + if ($null -eq $stats) { + $stderr = if (Test-Path $job.stderrPath) { (Get-Content -Path $job.stderrPath -Raw) } else { "" } + Write-Warning "[$($job.client)] no JSON stats parsed; exit=$($job.process.ExitCode); stderr=$([string]::IsNullOrWhiteSpace($stderr) ? '(empty)' : $stderr.Substring(0, [Math]::Min(300, $stderr.Length)))" + $results += [pscustomobject]@{ client = $job.client; exitCode = $job.process.ExitCode; stats = $null; stderr = $stderr } + } else { + $results += [pscustomobject]@{ client = $job.client; exitCode = $job.process.ExitCode; stats = $stats; stderr = $null } + } +} + +# Pretty-print a side-by-side table. +$rows = foreach ($r in $results) { + if ($null -eq $r.stats) { + [pscustomobject]@{ + client = $r.client + "calls/sec" = "ERR" + "total" = "-" + "ok" = "-" + "fail" = "-" + "cached/total" = "-" + "p50 ms" = "-" + "p95 ms" = "-" + "p99 ms" = "-" + "max ms" = "-" + "mean ms" = "-" + } + } else { + $s = $r.stats + [pscustomobject]@{ + client = $s.language + "calls/sec" = $s.callsPerSecond + "total" = $s.totalCalls + "ok" = $s.successfulCalls + "fail" = $s.failedCalls + "cached/total" = "$($s.cachedReadResults)/$($s.totalReadResults)" + "p50 ms" = $s.latencyMs.p50 + "p95 ms" = $s.latencyMs.p95 + "p99 ms" = $s.latencyMs.p99 + "max ms" = $s.latencyMs.max + "mean ms" = $s.latencyMs.mean + } + } +} + +$rows | Format-Table -AutoSize | Out-Host + +$report = [pscustomobject]@{ + schemaVersion = 1 + endpoint = $Endpoint + apiKeyEnv = $ApiKeyEnv + durationSeconds = $DurationSeconds + warmupSeconds = $WarmupSeconds + bulkSize = $BulkSize + distinctTags = [bool]$DistinctTags + tagPrefix = $TagPrefix + tagAttribute = $TagAttribute + startedAt = (Get-Date).ToUniversalTime().ToString("o") + clients = $results | ForEach-Object { + [ordered]@{ + client = $_.client + exitCode = $_.exitCode + stats = $_.stats + } + } +} +$report | ConvertTo-Json -Depth 12 | Set-Content -Path $ReportPath -Encoding UTF8 +Write-Host "Combined report written to: $ReportPath" + +Remove-Item -Path $tmpDir -Recurse -Force -ErrorAction SilentlyContinue