Files
mxaccessgw/scripts/run-client-e2e-tests.ps1
Joseph Doherty 71d2c39f01 e2e: port batch subcommand to all five client CLIs
scripts/run-client-e2e-tests.ps1 expects each language CLI to expose a
`batch` subcommand that reads command lines from stdin, runs each
through the normal subcommand dispatch, writes the JSON result, then
a sentinel line `__MXGW_BATCH_EOR__`. The implementation lived on a
divergent branch (commit 6126099) that was never merged into main —
this commit ports the same protocol to HEAD's renamed CLIs so the
existing matrix script runs end-to-end.

The protocol:
  - one line of stdin = one full CLI invocation
  - successful output → stdout, then __MXGW_BATCH_EOR__
  - failure → {"error":"...","type":"error"} JSON on stdout, then
    __MXGW_BATCH_EOR__ (errors do NOT exit the loop)
  - empty line or EOF terminates the loop

Per-CLI additions:

  .NET: RunBatchAsync + per-line StringWriter capture, JSON error
    envelope when forceJsonErrors is true. Two new tests in
    MxGatewayClientCliTests covering the success and error paths.

  Go:   runBatch with bufio.Scanner, runs each line through the
    existing runWithIO switch with a buffered stdout writer. One new
    test pinning the EOR sentinel.

  Rust: new `Batch` variant on the clap Command enum, run_batch
    re-parses each line via Cli::try_parse_from. Two new tests in the
    inline mod tests block.

  Python: new `batch` click command in commands.py that uses
    CliRunner to dispatch each line; synthesises {"error",..."type"}
    JSON from click error messages when the captured output isn't
    already JSON-shaped. Three new tests in test_cli.py.

  Java: BatchCommand inner @Command with BufferedReader stdin loop,
    fresh commandLine() per dispatch with captured stdout/stderr
    PrintWriters; non-zero exit codes and uncaught exceptions both
    surface as JSON-error blocks. Two new tests.

Also fixes scripts/run-client-e2e-tests.ps1 line 705: the Python
invocation was still passing the old module name `mxgateway_cli` to
`python -m`; the client SDK rename in 397d3c5 moved it to
`zb_mom_ww_mxgateway_cli`. Without the fix the Python leg fails
with "No module named mxgateway_cli" before reaching open-session.

Verification: full matrix at the redeployed gateway (localhost:5120,
running ZB.MOM.WW.MxGateway.Server.exe / ZB.MOM.WW.MxGateway.Worker.exe)
with -SkipBulk -SkipReadWriteBulk -SkipParity -SkipAuth (those phases
exercise bulk read/write CLI subcommands that also live on the
divergent branch — porting those is a follow-up). All five clients
report `closed=true, addedItems=120, eventCount=5` and overall
`success=true`. Per-language unit tests pass:
  - dotnet: 59/59
  - go:     all packages clean
  - rust:   cargo test --workspace clean
  - python: 42/42
  - java:   gradle build SUCCESSFUL

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 04:08:15 -04:00

1716 lines
75 KiB
PowerShell

<#
.SYNOPSIS
Cross-language client e2e matrix for the MXAccess Gateway.
.DESCRIPTION
Drives the .NET, Go, Rust, Python, and Java client CLIs against a running
gateway + worker. For each language the script exercises session open/close,
register, bulk subscribe/unsubscribe, per-tag add-item/advise, event
streaming, a write round-trip with value assertion, error-path (parity)
checks, and API-key auth rejection. With -VerifyAlarms it also exercises the
session-less stream-alarms and acknowledge-alarm subcommands against the
gateway's central alarm monitor.
Each client CLI is driven through one long-lived `batch` process: the harness
writes one command line to its stdin and reads the JSON result back, so the
~250 operations per client pay the process (and JVM/runtime) cold-start once
instead of once per operation.
The gateway and worker are assumed to be already running at -Endpoint; the
script does not start or stop them.
#>
[CmdletBinding()]
param(
[string[]]$Clients = @("dotnet", "go", "rust", "python", "java"),
[int]$MachineStart = 1,
[int]$MachineEnd = 20,
[string[]]$Attributes = @(
"ProtectedValue",
"TestChangingInt",
"TestBoolArray",
"TestIntArray",
"TestDateTimeArray",
"TestStringArray"
),
[string]$Endpoint = "localhost:5000",
[string]$ApiKeyEnv = "MXGATEWAY_API_KEY",
[string]$SqlServer = "localhost",
[string]$Database = "ZB",
[int]$EventLimit = 5,
[int]$BulkTagCount = 6,
# The per-tag advise loop advises every discovered tag with no StreamEvents
# consumer attached, so MXAccess change events accumulate in the worker
# event channel (MxGateway:Events:QueueCapacity). Left unbounded the channel
# overflows under FailFast backpressure and faults the worker — slow,
# process-per-call clients (the Java CLI) hit this before the loop ends.
# Every DrainEveryTags advised tags the loop connects a short-lived
# StreamEvents drain so the gateway pumps that channel empty. 0 disables it.
[int]$DrainEveryTags = 15,
[switch]$SkipStream,
[switch]$SkipBulk,
# Skip the bulk read+write coverage that runs alongside the existing
# subscribe-bulk phase. The read-bulk phase confirms cached-path
# semantics against tags left advised by subscribe-bulk (was_cached
# = true); the write-bulk phase runs when -VerifyWrite is set and
# exercises the BulkWriteResult shape against the writable tag.
[switch]$SkipReadWriteBulk,
# Write round-trip. Opt-in because it mutates live tag state: it writes a
# sentinel value to -WriteAttribute and asserts an OnWriteComplete event
# confirms the write reached the MXAccess provider.
[switch]$VerifyWrite,
[string]$WriteAttribute = "TestChangingInt",
[string]$WriteType = "int32",
[int]$WriteValueBase = 424200,
[int]$WriteEchoMaxEvents = 200,
# Alarm feed + acknowledge coverage. Opt-in because it depends on the
# gateway's central alarm monitor being enabled (MxGateway:Alarms:Enabled)
# and a live alarm provider: stream-alarms reads the monitor's snapshot and
# acknowledge-alarm acknowledges -AlarmReference. Both RPCs are session-less
# — they exercise the gateway's always-on monitor, not a client session.
[switch]$VerifyAlarms,
[string]$AlarmReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001",
# Messages to read from the central alarm feed. 1 is enough to confirm the
# subcommand round-trips: the feed's first message (an active-alarm
# snapshot, or snapshot-complete when no alarms are active) always arrives
# immediately, whereas later messages depend on live alarm transitions.
[int]$AlarmStreamMax = 1,
# Error-path (parity) checks.
[switch]$SkipParity,
# API-key auth rejection checks.
[switch]$SkipAuth,
# Optional env var holding an API key whose scopes are insufficient for
# open-session; when supplied the auth phase also asserts that key is
# rejected (PermissionDenied) on top of the always-on missing-key check.
[string]$RejectScopeApiKeyEnv,
# Run each language client concurrently as an isolated child process.
[switch]$Parallel,
[switch]$DryRun,
[string]$ReportPath,
# Internal: set by -Parallel on each spawned child so it always writes its
# report (even under -DryRun) for the parent to merge. Not for direct use.
[switch]$EmitReport
)
Set-StrictMode -Version Latest
$ErrorActionPreference = "Stop"
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
$discoveryScript = Join-Path $PSScriptRoot "discover-testmachine-tags.ps1"
$validClients = @("dotnet", "go", "rust", "python", "java")
$Clients = @($Clients | ForEach-Object {
$_ -split ","
} | ForEach-Object {
$_.Trim().ToLowerInvariant()
} | Where-Object {
-not [string]::IsNullOrWhiteSpace($_)
})
$Attributes = @($Attributes | ForEach-Object {
$_ -split ","
} | ForEach-Object {
$_.Trim()
} | Where-Object {
-not [string]::IsNullOrWhiteSpace($_)
})
if ($Clients.Count -eq 0) {
throw "At least one client is required."
}
if ($Attributes.Count -eq 0) {
throw "At least one attribute is required."
}
if ($BulkTagCount -lt 1) {
throw "BulkTagCount must be greater than zero."
}
if ($DrainEveryTags -lt 0) {
throw "DrainEveryTags cannot be negative."
}
if ($WriteEchoMaxEvents -lt 1) {
throw "WriteEchoMaxEvents must be greater than zero."
}
if ($AlarmStreamMax -lt 1) {
throw "AlarmStreamMax must be greater than zero."
}
foreach ($client in $Clients) {
if ($validClients -notcontains $client) {
throw "Unsupported client '$client'. Supported clients: $($validClients -join ', ')."
}
}
if ([string]::IsNullOrWhiteSpace($ReportPath)) {
$timestamp = Get-Date -Format "yyyyMMdd-HHmmss"
$ReportPath = Join-Path $repoRoot "artifacts/e2e/testmachine-client-e2e-$timestamp.json"
}
function ConvertTo-HttpEndpoint {
param([string]$Value)
if ($Value.StartsWith("http://", [StringComparison]::OrdinalIgnoreCase) -or
$Value.StartsWith("https://", [StringComparison]::OrdinalIgnoreCase)) {
return $Value
}
return "http://$Value"
}
function ConvertTo-HostEndpoint {
param([string]$Value)
$hostValue = $Value
if ($hostValue.StartsWith("http://", [StringComparison]::OrdinalIgnoreCase)) {
$hostValue = $hostValue.Substring(7)
} elseif ($hostValue.StartsWith("https://", [StringComparison]::OrdinalIgnoreCase)) {
$hostValue = $hostValue.Substring(8)
}
return $hostValue.TrimEnd("/")
}
function Join-CommandLine {
param(
[string]$FilePath,
[string[]]$Arguments
)
$parts = @($FilePath) + $Arguments
return ($parts | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " "
}
function ConvertTo-NativeArgument {
param([string]$Value)
if ($Value -notmatch '[\s"]') {
return $Value
}
return '"' + $Value.Replace('\', '\\').Replace('"', '\"') + '"'
}
function Invoke-NativeCommand {
param(
[string]$FilePath,
[string[]]$Arguments,
[string]$WorkingDirectory,
[hashtable]$Environment = @{},
# When set, a non-zero exit code is returned to the caller instead of
# throwing. Used by the parity and auth phases, which expect failure.
[switch]$AllowFailure
)
$process = [System.Diagnostics.Process]::new()
$process.StartInfo.FileName = $FilePath
$process.StartInfo.Arguments = ($Arguments | ForEach-Object {
ConvertTo-NativeArgument -Value $_
}) -join " "
$process.StartInfo.WorkingDirectory = $WorkingDirectory
$process.StartInfo.RedirectStandardOutput = $true
$process.StartInfo.RedirectStandardError = $true
$process.StartInfo.UseShellExecute = $false
foreach ($entry in $Environment.GetEnumerator()) {
$process.StartInfo.Environment[$entry.Key] = [string]$entry.Value
}
$commandLine = Join-CommandLine -FilePath $FilePath -Arguments $Arguments
if ($DryRun) {
Write-Host "[dry-run] $commandLine"
return [pscustomobject]@{
commandLine = $commandLine
exitCode = 0
stdout = "{}"
stderr = ""
}
}
[void]$process.Start()
$stdout = $process.StandardOutput.ReadToEnd()
$stderr = $process.StandardError.ReadToEnd()
$process.WaitForExit()
$result = [pscustomobject]@{
commandLine = $commandLine
exitCode = $process.ExitCode
stdout = $stdout
stderr = $stderr
}
if ($result.exitCode -ne 0 -and -not $AllowFailure) {
throw "Command failed with exit code $($result.exitCode): $commandLine`n$stderr`n$stdout"
}
return $result
}
function Read-JsonObject {
param([string]$Text)
if ([string]::IsNullOrWhiteSpace($Text)) {
return $null
}
try {
return $Text | ConvertFrom-Json
} catch {
$jsonLines = @()
foreach ($line in ($Text -split "`r?`n")) {
$trimmed = $line.Trim()
if ($trimmed.StartsWith("{") -and $trimmed.EndsWith("}")) {
$jsonLines += ($trimmed | ConvertFrom-Json)
}
}
return $jsonLines
}
}
function Get-OpenSessionId {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return $Json.sessionId }
"go" { return $Json.reply.sessionId }
"rust" { return $Json.sessionId }
"python" { return $Json.sessionId }
"java" { return $Json.reply.sessionId }
}
}
function Get-ServerHandle {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return [int]$Json.register.serverHandle }
"go" { return [int]$Json.reply.register.serverHandle }
"rust" { return [int]$Json.serverHandle }
"python" { return [int]$Json.serverHandle }
"java" { return [int]$Json.reply.register.serverHandle }
}
}
function Get-ItemHandle {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return [int]$Json.addItem.itemHandle }
"go" { return [int]$Json.reply.addItem.itemHandle }
"rust" { return [int]$Json.itemHandle }
"python" { return [int]$Json.itemHandle }
"java" { return [int]$Json.reply.addItem.itemHandle }
}
}
function Get-StreamEventCount {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return @($Json.events).Count }
"go" { return @($Json).Count }
"rust" { return [int]$Json.eventCount }
"python" { return @($Json.events).Count }
"java" { return @($Json).Count }
}
}
# Returns the per-event objects from a stream-events reply so the write
# round-trip can inspect their values. Mirrors Get-StreamEventCount: .NET,
# Rust, and Python aggregate under `events`; Go and Java emit one event
# object per line (Read-JsonObject collapses NDJSON into an array).
function Get-StreamEvents {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return @($Json.events) }
"go" { return @($Json) }
"rust" { return @($Json.events) }
"python" { return @($Json.events) }
"java" { return @($Json) }
}
}
# Counts the messages in a stream-alarms reply. The CLIs shape the aggregate
# JSON differently: .NET nests them under `alarms`, Rust under `messages` with
# a `messageCount`, Python under `messages`; Go and Java emit one AlarmFeedMessage
# object per line (Read-JsonObject collapses NDJSON into a bare array).
function Get-AlarmMessageCount {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return @($Json.alarms).Count }
"go" { return @($Json).Count }
"rust" { return [int]$Json.messageCount }
"python" { return @($Json.messages).Count }
"java" { return @($Json).Count }
}
}
function Get-PropertyValue {
param(
[object]$Object,
[string[]]$Names
)
if ($null -eq $Object) {
return $null
}
foreach ($name in $Names) {
$property = $Object.PSObject.Properties[$name]
if ($null -ne $property) {
return $property.Value
}
}
return $null
}
# Extracts the item handle from a streamed event, tolerating camelCase
# (protobuf-JSON) and snake_case (some MessageToDict shapes).
function Get-EventItemHandle {
param([object]$Event)
$handle = Get-PropertyValue -Object $Event -Names @("itemHandle", "item_handle")
if ($null -eq $handle) {
return $null
}
return [int]$handle
}
# Extracts the event family as a string. All five CLIs render it as the
# protobuf enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE).
function Get-EventFamily {
param([object]$Event)
return [string](Get-PropertyValue -Object $Event -Names @("family"))
}
# Extracts the scalar payload from a streamed event's MxValue as a string.
# The MxValue oneof renders to one protobuf-JSON `*Value` key; all five
# CLIs (after the Rust stream-events extension) emit the same key names.
function Get-EventScalar {
param([object]$Event)
$value = Get-PropertyValue -Object $Event -Names @("value")
if ($null -eq $value) {
return $null
}
foreach ($key in @("boolValue", "int32Value", "int64Value", "floatValue", "doubleValue", "stringValue")) {
$property = $value.PSObject.Properties[$key]
if ($null -ne $property -and $null -ne $property.Value) {
return [string]$property.Value
}
}
return $null
}
# Compares a written value against an observed event scalar. Numeric values
# are compared numerically (so 42 matches 42.0); everything else compares as
# a trimmed, case-insensitive string.
function Test-ValueEquals {
param(
[string]$Expected,
[string]$Observed
)
if ([string]::IsNullOrWhiteSpace($Observed)) {
return $false
}
$expectedNumber = 0.0
$observedNumber = 0.0
if ([double]::TryParse($Expected, [ref]$expectedNumber) -and
[double]::TryParse($Observed, [ref]$observedNumber)) {
return $expectedNumber -eq $observedNumber
}
return [string]::Equals($Expected.Trim(), $Observed.Trim(), [StringComparison]::OrdinalIgnoreCase)
}
function Get-BulkResults {
param(
[string]$Client,
[string]$Operation,
[object]$Json
)
if ($Client -in @("go", "rust", "python", "java")) {
return @(Get-PropertyValue -Object $Json -Names @("results"))
}
# .NET emits the full MxCommandReply via protobuf JSON, with results
# nested under a per-command field name.
$replyName = switch ($Operation) {
"subscribe-bulk" { "subscribeBulk" }
"unsubscribe-bulk" { "unsubscribeBulk" }
"read-bulk" { "readBulk" }
"write-bulk" { "writeBulk" }
"write2-bulk" { "write2Bulk" }
"write-secured-bulk" { "writeSecuredBulk" }
"write-secured2-bulk" { "writeSecured2Bulk" }
default { $Operation }
}
$reply = Get-PropertyValue -Object $Json -Names @($replyName)
return @(Get-PropertyValue -Object $reply -Names @("results"))
}
function Get-BulkItemHandles {
param([object[]]$Results)
return @($Results | ForEach-Object {
[int](Get-PropertyValue -Object $_ -Names @("itemHandle", "item_handle"))
} | Where-Object {
$_ -gt 0
})
}
function Assert-BulkResults {
param(
[string]$Client,
[string]$Operation,
[object[]]$Results,
[int]$ExpectedCount
)
if ($Results.Count -ne $ExpectedCount) {
throw "$Client $Operation returned $($Results.Count) result(s); expected $ExpectedCount."
}
foreach ($result in $Results) {
$success = Get-PropertyValue -Object $result -Names @("wasSuccessful", "was_successful")
if ($null -ne $success -and -not [bool]$success) {
$tagAddress = Get-PropertyValue -Object $result -Names @("tagAddress", "tag_address")
$errorMessage = Get-PropertyValue -Object $result -Names @("errorMessage", "error_message")
throw "$Client $Operation failed for '$tagAddress': $errorMessage"
}
}
}
# Builds the dotnet and Java client CLIs once up front and records the path to
# each compiled artifact, so the long-lived `batch` process is launched from
# the compiled exe / installed launcher without paying a `dotnet build` or
# `gradle` step at flow time. The Go, Rust, and Python batch processes are
# launched via `go run` / `cargo run` / `python -m`, which compile-or-start
# once when that single per-client process starts.
function Initialize-ClientBuilds {
if ($Clients -contains "dotnet") {
$cliProject = Join-Path $repoRoot "clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/ZB.MOM.WW.MxGateway.Client.Cli.csproj"
$script:dotnetCliExe = Join-Path $repoRoot `
"clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/bin/Debug/net10.0/ZB.MOM.WW.MxGateway.Client.Cli.exe"
if (-not $DryRun) {
Write-Host "Building the .NET client CLI once: $cliProject"
Invoke-NativeCommand -FilePath "dotnet" `
-Arguments @("build", $cliProject, "-c", "Debug", "--nologo", "-v", "quiet") `
-WorkingDirectory $repoRoot | Out-Null
if (-not (Test-Path $script:dotnetCliExe)) {
throw "The .NET client CLI build did not produce '$script:dotnetCliExe'."
}
}
}
if ($Clients -contains "java") {
$script:javaCliBat = Join-Path $repoRoot `
"clients/java/zb-mom-ww-mxgateway-cli/build/install/zb-mom-ww-mxgateway-cli/bin/zb-mom-ww-mxgateway-cli.bat"
if (-not $DryRun) {
$gradleCommand = Get-Command "gradle.bat", "gradle.cmd", "gradle.exe", "gradle" `
-ErrorAction SilentlyContinue | Select-Object -First 1
if ($null -eq $gradleCommand) {
throw "The 'gradle' command was not found on PATH; the Java client e2e flow requires Gradle."
}
Write-Host "Installing the Java client CLI once via :zb-mom-ww-mxgateway-cli:installDist"
Invoke-NativeCommand -FilePath "cmd.exe" `
-Arguments @("/c", $gradleCommand.Source, "--quiet", ":zb-mom-ww-mxgateway-cli:installDist") `
-WorkingDirectory (Join-Path $repoRoot "clients/java") | Out-Null
if (-not (Test-Path $script:javaCliBat)) {
throw "The Java client CLI install did not produce '$script:javaCliBat'."
}
}
}
}
function Get-ClientCommand {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values,
# The environment variable the client reads the API key from. Defaults
# to the run-wide -ApiKeyEnv; the auth phase overrides it to drive a
# missing-key or insufficient-scope rejection.
[string]$ApiKeyEnvName = $ApiKeyEnv
)
$httpEndpoint = ConvertTo-HttpEndpoint -Value $Endpoint
$hostEndpoint = ConvertTo-HostEndpoint -Value $Endpoint
$clientName = "mxgw-$Client-e2e"
$streamMaxEvents = if ($Values.ContainsKey("maxEvents")) { [int]$Values.maxEvents } else { $EventLimit }
# Python's stream-events call ends on a wall-clock timeout; give it enough
# headroom to drain a large write-echo budget.
$pythonStreamTimeout = [Math]::Max(15, [int][Math]::Ceiling($streamMaxEvents / 4.0))
switch ($Client) {
"dotnet" {
$arguments = @(
$Operation,
"--endpoint", $httpEndpoint,
"--api-key-env", $ApiKeyEnvName,
"--timeout", "60s",
"--json"
)
if ($Operation -eq "open-session") {
$arguments += @("--client-name", $clientName)
} elseif ($Operation -eq "register") {
$arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName)
} elseif ($Operation -eq "add-item") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item)
} elseif ($Operation -eq "advise") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)")
} elseif ($Operation -eq "subscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
return [pscustomobject]@{ file = $script:dotnetCliExe; args = $arguments; cwd = $repoRoot; env = @{} }
}
"go" {
$arguments = @(
"run", "./cmd/mxgw-go", $Operation,
"-endpoint", $hostEndpoint,
"-api-key-env", $ApiKeyEnvName,
"-plaintext",
"-json"
)
if ($Operation -eq "open-session") {
$arguments += @("-client-session-name", $clientName)
} elseif ($Operation -eq "register") {
$arguments += @("-session-id", $Values.sessionId, "-client-name", $clientName)
} elseif ($Operation -eq "add-item") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item", $Values.item)
} elseif ($Operation -eq "advise") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)")
} elseif ($Operation -eq "subscribe-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("-timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)",
"-item-handles", $Values.itemHandles, "-type", $Values.valueType, "-values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("-user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("-session-id", $Values.sessionId, "-limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("-limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("-filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("-reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("-comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("-operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("-session-id", $Values.sessionId)
}
return [pscustomobject]@{ file = "go"; args = $arguments; cwd = (Join-Path $repoRoot "clients/go"); env = @{} }
}
"rust" {
$arguments = @(
"run", "-p", "mxgw-cli", "--", $Operation,
"--endpoint", $httpEndpoint,
"--api-key-env", $ApiKeyEnvName,
"--json"
)
if ($Operation -eq "open-session") {
$arguments += @("--client-name", $clientName)
} elseif ($Operation -eq "register") {
$arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName)
} elseif ($Operation -eq "add-item") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item)
} elseif ($Operation -eq "advise") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)")
} elseif ($Operation -eq "subscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
# Rust uses --value-type for the type flag.
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--value-type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
# Rust names the type flag --value-type, unlike the other CLIs.
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
return [pscustomobject]@{ file = "cargo"; args = $arguments; cwd = (Join-Path $repoRoot "clients/rust"); env = @{} }
}
"python" {
$arguments = @(
"-m", "zb_mom_ww_mxgateway_cli", $Operation,
"--endpoint", $hostEndpoint,
"--api-key-env", $ApiKeyEnvName,
"--plaintext",
"--json"
)
if ($Operation -eq "open-session") {
$arguments += @("--client-name", $clientName)
} elseif ($Operation -eq "register") {
$arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName)
} elseif ($Operation -eq "add-item") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item)
} elseif ($Operation -eq "advise") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)")
} elseif ($Operation -eq "subscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-messages", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
$env = @{
PYTHONPATH = Join-Path $repoRoot "clients/python/src"
}
return [pscustomobject]@{ file = "py"; args = @("-3.12") + $arguments; cwd = (Join-Path $repoRoot "clients/python"); env = $env }
}
"java" {
$cliArgs = @(
$Operation,
"--endpoint", $hostEndpoint,
"--api-key-env", $ApiKeyEnvName,
"--plaintext",
"--json"
)
if ($Operation -eq "open-session") {
$cliArgs += @("--client-session-name", $clientName)
} elseif ($Operation -eq "register") {
$cliArgs += @("--session-id", $Values.sessionId, "--client-name", $clientName)
} elseif ($Operation -eq "add-item") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item)
} elseif ($Operation -eq "advise") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)")
} elseif ($Operation -eq "subscribe-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
} elseif ($Operation -eq "unsubscribe-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles)
} elseif ($Operation -eq "read-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items)
if ($Values.ContainsKey("timeoutMs")) { $cliArgs += @("--timeout-ms", "$($Values.timeoutMs)") }
} elseif ($Operation -eq "write-bulk") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)",
"--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values)
if ($Values.ContainsKey("userId")) { $cliArgs += @("--user-id", "$($Values.userId)") }
} elseif ($Operation -eq "write") {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$cliArgs += @("--session-id", $Values.sessionId, "--limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$cliArgs += @("--limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $cliArgs += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$cliArgs += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $cliArgs += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $cliArgs += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$cliArgs += @("--session-id", $Values.sessionId)
}
# The Java CLI is installed once up front (gradle
# :zb-mom-ww-mxgateway-cli:installDist) so each call runs the generated
# launcher script directly instead of paying Gradle configuration
# plus a JVM cold-start per invocation. .NET's Process.Start
# (UseShellExecute=false) cannot launch a .bat directly, so the
# launcher runs through cmd.exe.
return [pscustomobject]@{
file = "cmd.exe"
args = @("/c", $script:javaCliBat) + $cliArgs
cwd = (Join-Path $repoRoot "clients/java")
env = @{}
}
}
}
}
# Synthesizes a dry-run JSON reply for an operation so the flow can be
# validated without a live gateway.
function Get-DryRunReply {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values
)
switch ($Operation) {
"open-session" { return [pscustomobject]@{ sessionId = "dry-run-session-$Client"; reply = [pscustomobject]@{ sessionId = "dry-run-session-$Client" } } }
"register" { return [pscustomobject]@{ serverHandle = 1; register = [pscustomobject]@{ serverHandle = 1 }; reply = [pscustomobject]@{ register = [pscustomobject]@{ serverHandle = 1 } } } }
"add-item" { return [pscustomobject]@{ itemHandle = 1; addItem = [pscustomobject]@{ itemHandle = 1 }; reply = [pscustomobject]@{ addItem = [pscustomobject]@{ itemHandle = 1 } } } }
"write" { return [pscustomobject]@{ ok = $true; operation = "write"; reply = [pscustomobject]@{} } }
"subscribe-bulk" {
$results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process {
[pscustomobject]@{ itemHandle = $index++; tagAddress = $_; wasSuccessful = $true }
})
return [pscustomobject]@{ subscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"unsubscribe-bulk" {
$results = @($Values.itemHandles -split "," | ForEach-Object {
[pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true }
})
return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"read-bulk" {
$results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process {
[pscustomobject]@{
itemHandle = $index++
tagAddress = $_
wasSuccessful = $true
wasCached = $true
}
})
return [pscustomobject]@{ readBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"write-bulk" {
$results = @($Values.itemHandles -split "," | ForEach-Object {
[pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true }
})
return [pscustomobject]@{ writeBulk = [pscustomobject]@{ results = $results }; results = $results }
}
"stream-events" {
# Synthesize an OnDataChange (carrying the written value) and an
# OnWriteComplete so the write round-trip assertion passes under
# -DryRun. The reply is shaped per client: Go and Java emit one
# event object per line (Read-JsonObject collapses NDJSON to a
# bare array), the others aggregate the events under `events`.
$itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 }
$echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 }
$dataEvent = [pscustomobject]@{
workerSequence = 1
family = "MX_EVENT_FAMILY_ON_DATA_CHANGE"
itemHandle = $itemHandle
value = [pscustomobject]@{ int32Value = $echoValue }
}
$writeCompleteEvent = [pscustomobject]@{
workerSequence = 2
family = "MX_EVENT_FAMILY_ON_WRITE_COMPLETE"
itemHandle = $itemHandle
}
$events = @($dataEvent, $writeCompleteEvent)
switch ($Client) {
"go" { return ,$events }
"java" { return ,$events }
"rust" { return [pscustomobject]@{ eventCount = $events.Count; events = $events } }
default { return [pscustomobject]@{ events = $events } }
}
}
"stream-alarms" {
# Synthesize an active-alarm snapshot followed by the
# snapshot-complete sentinel. The reply is shaped per client:
# Go and Java emit one message object per line (Read-JsonObject
# collapses NDJSON to a bare array), Rust aggregates under
# `messages` with a `messageCount`, Python under `messages`, and
# .NET under `alarms`.
$activeAlarm = [pscustomobject]@{
activeAlarm = [pscustomobject]@{
alarmFullReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001"
currentState = "ALARM_CONDITION_STATE_ACTIVE"
severity = 500
}
}
$snapshotComplete = [pscustomobject]@{ snapshotComplete = $true }
$messages = @($activeAlarm, $snapshotComplete)
switch ($Client) {
"go" { return ,$messages }
"java" { return ,$messages }
"rust" { return [pscustomobject]@{ messageCount = $messages.Count; messages = $messages } }
"dotnet" { return [pscustomobject]@{ alarms = $messages } }
default { return [pscustomobject]@{ messages = $messages } }
}
}
"acknowledge-alarm" {
return [pscustomobject]@{
rawReply = [pscustomobject]@{ hresult = 0; diagnosticMessage = "dry-run ack" }
reply = [pscustomobject]@{ hresult = 0 }
}
}
default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } }
}
}
# --- Batch-mode client process ---------------------------------------------
# The e2e flow issues ~250 operations per client. Spawning one CLI process per
# operation pays a process — and, for the JVM, a runtime — cold-start every
# time. Instead each client CLI exposes a `batch` subcommand: a single
# long-lived process that reads one command line from stdin, runs it, writes
# the JSON result, then a line containing exactly $batchTerminator. The harness
# drives that one process per client, so startup is paid once.
$script:batchTerminator = "__MXGW_BATCH_EOR__"
$script:currentBatchClient = $null
# A redirected child's StandardInput writer is created with Console.InputEncoding,
# which is UTF-8 *with a BOM* on this host. The writer then prepends that BOM to
# the first bytes it sends, and the CLIs parse it into their first argument.
# Switching the console input encoding to a BOM-less encoding before any batch
# process starts makes that writer BOM-free. The e2e command lines are ASCII.
try {
[Console]::InputEncoding = [System.Text.Encoding]::ASCII
} catch {
Write-Warning "Could not set a BOM-less console input encoding: $($_.Exception.Message)"
}
# Derives the `batch`-process launch spec from Get-ClientCommand: the launch
# prefix is whatever precedes the operation token (e.g. `run -p mxgw-cli --`),
# with the operation itself replaced by `batch`.
function Get-BatchLaunchSpec {
param([string]$Client)
$command = Get-ClientCommand -Client $Client -Operation "open-session" -Values @{} -ApiKeyEnvName $ApiKeyEnv
$argList = [object[]]$command.args
$operationIndex = [Array]::IndexOf($argList, "open-session")
if ($operationIndex -lt 0) {
throw "Cannot locate the operation token in the '$Client' command line."
}
$prefix = if ($operationIndex -gt 0) { @($argList[0..($operationIndex - 1)]) } else { @() }
return [pscustomobject]@{
file = $command.file
args = @($prefix + "batch")
cwd = $command.cwd
env = $command.env
}
}
# Returns just the operation arguments (operation token + flags) for a client
# command, stripping the launch prefix — this is the line written to the batch
# process for one operation.
function Get-ClientOperationArgs {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values,
[string]$ApiKeyEnvName = $ApiKeyEnv
)
$command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$argList = [object[]]$command.args
$operationIndex = [Array]::IndexOf($argList, $Operation)
if ($operationIndex -le 0) {
return @($argList)
}
return @($argList[$operationIndex..($argList.Count - 1)])
}
# True when a parsed command reply is the CLI's failure envelope rather than a
# normal result. All five CLIs emit a top-level `error` field on failure.
function Test-OperationFailed {
param([object]$Json)
if ($null -eq $Json) {
return $true
}
$errorValue = Get-PropertyValue -Object $Json -Names @("error")
return -not [string]::IsNullOrEmpty([string]$errorValue)
}
# Starts the long-lived `batch` process for a client and returns a handle
# carrying the process and its redirected stdin/stdout streams.
function Start-BatchClient {
param([string]$Client)
$spec = Get-BatchLaunchSpec -Client $Client
$startInfo = [System.Diagnostics.ProcessStartInfo]::new()
$startInfo.FileName = $spec.file
$startInfo.Arguments = ($spec.args | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " "
$startInfo.WorkingDirectory = $spec.cwd
$startInfo.RedirectStandardInput = $true
$startInfo.RedirectStandardOutput = $true
# stderr is left attached to the console: the CLIs only log diagnostics
# there, and not redirecting it removes any risk of the child blocking on a
# full stderr pipe while the harness reads stdout.
$startInfo.RedirectStandardError = $false
$startInfo.UseShellExecute = $false
foreach ($entry in $spec.env.GetEnumerator()) {
$startInfo.Environment[$entry.Key] = [string]$entry.Value
}
$process = [System.Diagnostics.Process]::new()
$process.StartInfo = $startInfo
[void]$process.Start()
return [pscustomobject]@{ client = $Client; process = $process; input = $process.StandardInput }
}
# Sends one operation to a batch process and returns its raw JSON output text
# (everything written before the terminator line).
function Invoke-BatchOperation {
param(
[pscustomobject]$BatchClient,
[string]$Client,
[string]$Operation,
[hashtable]$Values,
[string]$ApiKeyEnvName
)
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
$process = $BatchClient.process
$BatchClient.input.WriteLine(($operationArgs -join " "))
$BatchClient.input.Flush()
$builder = [System.Text.StringBuilder]::new()
while ($true) {
$line = $process.StandardOutput.ReadLine()
if ($null -eq $line) {
throw ("Batch client '$Client' closed its output before terminating operation " +
"'$Operation' (process exited: $($process.HasExited)).")
}
if ($line -eq $script:batchTerminator) {
break
}
[void]$builder.AppendLine($line)
}
return $builder.ToString()
}
# Signals end-of-input to a batch process and waits for it to exit.
function Stop-BatchClient {
param([pscustomobject]$BatchClient)
if ($null -eq $BatchClient) {
return
}
$process = $BatchClient.process
try {
if (-not $process.HasExited) {
$BatchClient.input.Close()
if (-not $process.WaitForExit(15000)) {
$process.Kill($true)
}
}
} catch {
try { $process.Kill($true) } catch { }
} finally {
$process.Dispose()
}
}
function Invoke-ClientOperation {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values = @{},
[string]$ApiKeyEnvName = $ApiKeyEnv
)
if ($DryRun) {
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')"
return Get-DryRunReply -Client $Client -Operation $Operation -Values $Values
}
$stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client `
-Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$json = Read-JsonObject -Text $stdout
if (Test-OperationFailed -Json $json) {
$errorValue = Get-PropertyValue -Object $json -Names @("error")
throw "$Client $Operation failed: $errorValue"
}
return $json
}
# Runs a client operation that is expected to fail. Returns a record whose
# `failed` flag is true when the CLI reported its failure envelope. Under
# -DryRun a synthetic failure is returned so the parity and auth phases can be
# exercised offline.
function Invoke-ClientOperationExpectingFailure {
param(
[string]$Client,
[string]$Operation,
[hashtable]$Values = @{},
[string]$ApiKeyEnvName = $ApiKeyEnv
)
if ($DryRun) {
$operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation `
-Values $Values -ApiKeyEnvName $ApiKeyEnvName
Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')"
return [pscustomobject]@{ failed = $true; json = $null }
}
$stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client `
-Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName
$json = Read-JsonObject -Text $stdout
return [pscustomobject]@{ failed = (Test-OperationFailed -Json $json); json = $json }
}
# Connects a short-lived StreamEvents consumer so the gateway empties the worker
# event channel. The per-tag advise loop advises every discovered tag with no
# consumer attached; without periodic draining the worker event channel
# (MxGateway:Events:QueueCapacity) overflows under FailFast backpressure and
# faults the worker.
#
# A small bounded read is enough: the gateway's per-stream producer
# (EventStreamService.ProduceEventsAsync) races ahead of the CLI and pulls the
# entire worker event channel into its own buffer the instant a subscriber
# attaches, so the channel is emptied long before the CLI finishes reading
# these events. Run via the expecting-failure path so the drain's exit code is
# ignored — its purpose is the side effect (emptying the channel), not output.
function Invoke-EventDrain {
param(
[string]$Client,
[string]$SessionId
)
Invoke-ClientOperationExpectingFailure -Client $Client -Operation "stream-events" -Values @{
sessionId = $SessionId
maxEvents = 200
} | Out-Null
}
# Runs the full e2e flow for a single language client and returns the result
# record. Discovered tags are passed in so the (slow) SQL discovery runs once.
function Invoke-ClientFlow {
param(
[string]$Client,
[object[]]$Tags
)
Write-Host "Running $Client client e2e flow against $($Tags.Count) discovered tags."
$sessionId = $null
$serverHandle = $null
$clientResult = [ordered]@{
language = $Client
sessionId = $null
serverHandle = $null
bulk = $null
addedItems = @()
eventCount = 0
write = $null
alarms = $null
parity = @()
auth = @()
closed = $false
error = $null
}
try {
if (-not $DryRun) {
$script:currentBatchClient = Start-BatchClient -Client $Client
}
$openJson = Invoke-ClientOperation -Client $Client -Operation "open-session"
$sessionId = Get-OpenSessionId -Client $Client -Json $openJson
if ([string]::IsNullOrWhiteSpace($sessionId)) {
throw "The $Client open-session command did not return a session id."
}
$clientResult.sessionId = $sessionId
$registerJson = Invoke-ClientOperation -Client $Client -Operation "register" -Values @{
sessionId = $sessionId
}
$serverHandle = Get-ServerHandle -Client $Client -Json $registerJson
$clientResult.serverHandle = $serverHandle
# --- Write round-trip + value assertion ---------------------------
# Runs right after register, before the bulk and add-item phases, so
# only a small backlog of events precedes the write. The gateway
# replays the per-session event buffer from the start, so the
# post-write OnWriteComplete must be reachable within the bounded
# -WriteEchoMaxEvents window.
if ($VerifyWrite) {
$writeTag = @($Tags | Where-Object {
$_.attributeName -eq $WriteAttribute
}) | Select-Object -First 1
if ($null -eq $writeTag) {
Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'."
} else {
$writeAddJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
item = $writeTag.fullTagReference
}
$writeItemHandle = Get-ItemHandle -Client $Client -Json $writeAddJson
Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandle = $writeItemHandle
} | Out-Null
$sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)"
Invoke-ClientOperation -Client $Client -Operation "write" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandle = $writeItemHandle
valueType = $WriteType
value = $sentinelValue
} | Out-Null
$writeStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{
sessionId = $sessionId
maxEvents = $WriteEchoMaxEvents
echoItemHandle = $writeItemHandle
echoValue = $sentinelValue
}
$writeEvents = @(Get-StreamEvents -Client $Client -Json $writeStreamJson)
$writeItemEvents = @($writeEvents | Where-Object {
(Get-EventItemHandle -Event $_) -eq $writeItemHandle
})
# The reliable write round-trip signal: MXAccess fires
# OnWriteComplete once the write reaches the provider. The
# value echo is best-effort — a provider-driven attribute
# (e.g. a simulated counter) accepts the write but does not
# hold the value, so no OnDataChange carries it back.
$writeCompleteEvent = $writeItemEvents | Where-Object {
(Get-EventFamily -Event $_) -match "WRITE_COMPLETE"
} | Select-Object -First 1
$echoEvent = $writeItemEvents | Where-Object {
Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_)
} | Select-Object -First 1
if ($null -eq $writeCompleteEvent) {
throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " +
"'$($writeTag.fullTagReference)' (item handle $writeItemHandle) but no " +
"OnWriteComplete event was observed in $($writeEvents.Count) streamed event(s). " +
"Increase -WriteEchoMaxEvents, or drop -VerifyWrite.")
}
$clientResult.write = [ordered]@{
attributeName = $WriteAttribute
fullTagReference = $writeTag.fullTagReference
itemHandle = $writeItemHandle
valueType = $WriteType
value = $sentinelValue
writeCompleteObserved = $true
echoObserved = ($null -ne $echoEvent)
}
# WriteBulk smoke: single-entry batch against the same writable
# tag. Exercises the BulkWriteResult wire format end-to-end
# without complicating the OnWriteComplete echo assertion that
# the single-item write phase already verified above. Pinned
# to a different sentinel value so a subsequent read-bulk
# against the same tag would see the bulk write's effect.
if (-not $SkipReadWriteBulk) {
$bulkSentinel = $sentinelValue + 1
$writeBulkJson = Invoke-ClientOperation -Client $Client -Operation "write-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandles = "$writeItemHandle"
valueType = $WriteType
values = "$bulkSentinel"
userId = 0
}
$writeBulkResults = @(Get-BulkResults -Client $Client -Operation "write-bulk" -Json $writeBulkJson)
Assert-BulkResults -Client $Client -Operation "write-bulk" -Results $writeBulkResults -ExpectedCount 1
$clientResult.write.writeBulkValue = $bulkSentinel
$clientResult.write.writeBulkResultCount = $writeBulkResults.Count
}
}
}
if (-not $SkipBulk) {
$bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count)))
$bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join ","
$subscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "subscribe-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
items = $bulkItems
}
$subscribeResults = @(Get-BulkResults -Client $Client -Operation "subscribe-bulk" -Json $subscribeBulkJson)
Assert-BulkResults -Client $Client -Operation "subscribe-bulk" -Results $subscribeResults -ExpectedCount $bulkTags.Count
$bulkItemHandles = @(Get-BulkItemHandles -Results $subscribeResults)
if ($bulkItemHandles.Count -ne $bulkTags.Count) {
throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)."
}
# ReadBulk over the already-advised tags: every result must come
# from the per-session value cache (was_cached = true). Confirms
# the gateway/worker/cache wiring serves cached values for tags
# the caller did not create the subscription for.
$readBulkSummary = $null
if (-not $SkipReadWriteBulk) {
$readBulkJson = Invoke-ClientOperation -Client $Client -Operation "read-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
items = $bulkItems
timeoutMs = 1500
}
$readResults = @(Get-BulkResults -Client $Client -Operation "read-bulk" -Json $readBulkJson)
Assert-BulkResults -Client $Client -Operation "read-bulk" -Results $readResults -ExpectedCount $bulkTags.Count
$cachedCount = @($readResults | Where-Object {
[bool](Get-PropertyValue -Object $_ -Names @("wasCached", "was_cached"))
}).Count
# Allow up to one snapshot fallback per batch: a freshly
# advised tag may not have an OnDataChange cached yet if it
# hasn't pushed an update in the small window between
# subscribe-bulk and read-bulk. Anything beyond that means
# the cached-path optimization is broken.
$maxSnapshotFallbacks = 1
if ($cachedCount -lt ($readResults.Count - $maxSnapshotFallbacks)) {
throw ("$Client read-bulk only returned $cachedCount cached result(s) " +
"out of $($readResults.Count); the cache-then-snapshot fork must " +
"serve cached values for already-advised tags.")
}
$readBulkSummary = [ordered]@{
tagCount = $readResults.Count
cachedCount = $cachedCount
}
}
$unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandles = $bulkItemHandles -join ","
}
$unsubscribeResults = @(Get-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Json $unsubscribeBulkJson)
Assert-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Results $unsubscribeResults -ExpectedCount $bulkItemHandles.Count
$clientResult.bulk = [ordered]@{
tagCount = $bulkTags.Count
subscribedCount = $subscribeResults.Count
unsubscribedCount = $unsubscribeResults.Count
itemHandles = $bulkItemHandles
readBulk = $readBulkSummary
}
}
$advisedSinceDrain = 0
foreach ($tag in $Tags) {
$addJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
item = $tag.fullTagReference
}
$itemHandle = Get-ItemHandle -Client $Client -Json $addJson
Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{
sessionId = $sessionId
serverHandle = $serverHandle
itemHandle = $itemHandle
} | Out-Null
$clientResult.addedItems += [ordered]@{
tagName = $tag.tagName
attributeName = $tag.attributeName
fullTagReference = $tag.fullTagReference
itemHandle = $itemHandle
protectedWriteRequired = $tag.attributeName -eq "ProtectedValue"
}
# Drain the worker event channel every DrainEveryTags advised tags
# so this unbounded advise loop cannot overflow it and fault the
# worker before the loop completes.
$advisedSinceDrain++
if ($DrainEveryTags -gt 0 -and $advisedSinceDrain -ge $DrainEveryTags) {
Invoke-EventDrain -Client $Client -SessionId $sessionId
$advisedSinceDrain = 0
}
}
# --- Event streaming ----------------------------------------------
if (-not $SkipStream) {
$streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{
sessionId = $sessionId
}
$clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson
if ($clientResult.eventCount -lt 1) {
throw "The $Client stream-events command returned no events."
}
}
# --- Alarm feed + acknowledge -------------------------------------
# Session-less RPCs against the gateway's always-on central alarm
# monitor. Opt-in (-VerifyAlarms) because it needs the monitor enabled
# (MxGateway:Alarms:Enabled) and a live alarm provider.
if ($VerifyAlarms) {
$alarmStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-alarms" -Values @{
maxEvents = $AlarmStreamMax
}
$alarmMessageCount = Get-AlarmMessageCount -Client $Client -Json $alarmStreamJson
if ($alarmMessageCount -lt 1) {
throw "The $Client stream-alarms command returned no alarm-feed messages."
}
# The acknowledge round-trips against the central monitor; the
# native ack outcome depends on whether the referenced alarm is
# currently active, so only the RPC's success is asserted here.
Invoke-ClientOperation -Client $Client -Operation "acknowledge-alarm" -Values @{
alarmReference = $AlarmReference
comment = "e2e-matrix"
operator = "mxgw-e2e"
} | Out-Null
$clientResult.alarms = [ordered]@{
streamMessageCount = $alarmMessageCount
acknowledgeReference = $AlarmReference
acknowledged = $true
}
}
# --- Error-path (parity) checks -----------------------------------
# MXAccess parity: an invalid item handle and an unknown session must
# both be rejected rather than silently succeeding.
if (-not $SkipParity) {
$parityChecks = @(
[ordered]@{
check = "invalid-item-handle"
operation = "advise"
values = @{ sessionId = $sessionId; serverHandle = $serverHandle; itemHandle = 2147483647 }
},
[ordered]@{
check = "unknown-session"
operation = "register"
values = @{ sessionId = [guid]::NewGuid().ToString() }
}
)
foreach ($parityCheck in $parityChecks) {
$parityResult = Invoke-ClientOperationExpectingFailure `
-Client $Client -Operation $parityCheck.operation -Values $parityCheck.values
$passed = [bool]$parityResult.failed
$clientResult.parity += [ordered]@{
check = $parityCheck.check
operation = $parityCheck.operation
passed = $passed
}
if (-not $passed) {
throw "$Client parity check '$($parityCheck.check)' expected $($parityCheck.operation) to fail but it exited 0."
}
}
}
# --- API-key auth rejection ---------------------------------------
# Runs after a working session is established, so a non-zero exit is
# an auth rejection rather than the gateway being unreachable.
if (-not $SkipAuth) {
$authChecks = @(
[ordered]@{ check = "missing-api-key"; apiKeyEnv = $script:missingKeyEnvName }
)
if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) {
$authChecks += [ordered]@{ check = "insufficient-scope"; apiKeyEnv = $RejectScopeApiKeyEnv }
}
foreach ($authCheck in $authChecks) {
$authResult = Invoke-ClientOperationExpectingFailure `
-Client $Client -Operation "open-session" -ApiKeyEnvName $authCheck.apiKeyEnv
$passed = [bool]$authResult.failed
$clientResult.auth += [ordered]@{
check = $authCheck.check
passed = $passed
}
if (-not $passed) {
throw "$Client auth check '$($authCheck.check)' expected open-session to be rejected but it exited 0."
}
}
}
} catch {
$clientResult.error = $_.Exception.Message
Write-Warning "$Client e2e flow failed: $($clientResult.error)"
} finally {
if (-not [string]::IsNullOrWhiteSpace($sessionId)) {
try {
Invoke-ClientOperation -Client $Client -Operation "close-session" -Values @{
sessionId = $sessionId
} | Out-Null
$clientResult.closed = $true
} catch {
$clientResult.error = "$($clientResult.error) close-session failed: $($_.Exception.Message)"
}
}
if ($null -ne $script:currentBatchClient) {
Stop-BatchClient -BatchClient $script:currentBatchClient
$script:currentBatchClient = $null
}
}
return $clientResult
}
# Forwards every run parameter to a single-client child invocation used by
# -Parallel. -Parallel itself is intentionally omitted so the child runs the
# serial path.
function Get-ChildArgumentList {
param(
[string]$Client,
[string]$ChildReportPath
)
$childArgs = @(
"-NoProfile", "-ExecutionPolicy", "Bypass", "-File", $PSCommandPath,
"-Clients", $Client,
"-MachineStart", "$MachineStart",
"-MachineEnd", "$MachineEnd",
"-Attributes", ($Attributes -join ","),
"-Endpoint", $Endpoint,
"-ApiKeyEnv", $ApiKeyEnv,
"-SqlServer", $SqlServer,
"-Database", $Database,
"-EventLimit", "$EventLimit",
"-BulkTagCount", "$BulkTagCount",
"-DrainEveryTags", "$DrainEveryTags",
"-WriteAttribute", $WriteAttribute,
"-WriteType", $WriteType,
"-WriteValueBase", "$WriteValueBase",
"-WriteEchoMaxEvents", "$WriteEchoMaxEvents",
"-AlarmReference", $AlarmReference,
"-AlarmStreamMax", "$AlarmStreamMax",
"-ReportPath", $ChildReportPath,
"-EmitReport"
)
if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) {
$childArgs += @("-RejectScopeApiKeyEnv", $RejectScopeApiKeyEnv)
}
if ($SkipStream) { $childArgs += "-SkipStream" }
if ($SkipBulk) { $childArgs += "-SkipBulk" }
if ($VerifyWrite) { $childArgs += "-VerifyWrite" }
if ($VerifyAlarms) { $childArgs += "-VerifyAlarms" }
if ($SkipParity) { $childArgs += "-SkipParity" }
if ($SkipAuth) { $childArgs += "-SkipAuth" }
if ($DryRun) { $childArgs += "-DryRun" }
return $childArgs
}
# An env var name that is guaranteed not to be set in this process, used to
# drive the missing-API-key auth rejection.
$script:missingKeyEnvName = "MXGW_E2E_MISSING_KEY_" + ([guid]::NewGuid().ToString("N"))
# --- Parallel mode: fan out one isolated child process per client ----------
if ($Parallel -and $Clients.Count -gt 1) {
Write-Host "Running $($Clients.Count) client e2e flows in parallel."
$childRoot = Join-Path ([System.IO.Path]::GetTempPath()) ("mxgw-e2e-" + ([guid]::NewGuid().ToString("N")))
New-Item -ItemType Directory -Path $childRoot -Force | Out-Null
$children = @()
foreach ($client in $Clients) {
$childReport = Join-Path $childRoot "$client.json"
$childLog = Join-Path $childRoot "$client.log"
$childArgs = Get-ChildArgumentList -Client $client -ChildReportPath $childReport
$process = Start-Process -FilePath "pwsh" -ArgumentList $childArgs `
-RedirectStandardOutput $childLog -RedirectStandardError "$childLog.err" `
-NoNewWindow -PassThru
$children += [pscustomobject]@{
client = $client
process = $process
report = $childReport
log = $childLog
}
}
foreach ($child in $children) {
$child.process.WaitForExit()
}
$mergedClients = @()
$discoveredTags = @()
$hadFailure = $false
foreach ($child in $children) {
foreach ($logPath in @($child.log, "$($child.log).err")) {
if ((Test-Path $logPath) -and -not [string]::IsNullOrWhiteSpace((Get-Content -Raw -Path $logPath))) {
Write-Host "----- $($child.client) -----"
Get-Content -Path $logPath | ForEach-Object { Write-Host $_ }
}
}
if ($child.process.ExitCode -ne 0) {
$hadFailure = $true
}
if (Test-Path $child.report) {
$childRun = Get-Content -Raw -Path $child.report | ConvertFrom-Json
$mergedClients += @($childRun.clients)
if ($discoveredTags.Count -eq 0) {
$discoveredTags = @($childRun.discoveredTags)
}
} else {
$hadFailure = $true
$mergedClients += [pscustomobject]@{
language = $child.client
error = "Child process exited $($child.process.ExitCode) without writing a report."
}
}
}
$run = [ordered]@{
schemaVersion = 1
endpoint = $Endpoint
apiKeyEnv = $ApiKeyEnv
machineStart = $MachineStart
machineEnd = $MachineEnd
attributes = $Attributes
eventLimit = $EventLimit
bulkTagCount = $BulkTagCount
drainEveryTags = $DrainEveryTags
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute
parallel = $true
discoveredTags = $discoveredTags
clients = $mergedClients
completedAt = (Get-Date).ToUniversalTime().ToString("O")
success = -not $hadFailure
}
$reportDirectory = Split-Path -Parent $ReportPath
if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) {
New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null
}
$run | ConvertTo-Json -Depth 8 | Set-Content -Path $ReportPath -Encoding UTF8
Write-Host "Wrote merged e2e report to $ReportPath"
Remove-Item -Path $childRoot -Recurse -Force -ErrorAction SilentlyContinue
if ($hadFailure) {
exit 1
}
return
}
# --- Serial mode -----------------------------------------------------------
Initialize-ClientBuilds
$discoveryJson = & $discoveryScript `
-MachineStart $MachineStart `
-MachineEnd $MachineEnd `
-Attributes $Attributes `
-SqlServer $SqlServer `
-Database $Database `
-Json
$convertedTags = $discoveryJson | ConvertFrom-Json
$tags = @($convertedTags)
if ($tags.Count -eq 1 -and $tags[0] -is [System.Array]) {
$tags = @($tags[0])
}
$expectedTagCount = (($MachineEnd - $MachineStart + 1) * $Attributes.Count)
if ($tags.Count -ne $expectedTagCount) {
$found = $tags | Group-Object -Property tagName | ForEach-Object {
"$($_.Name)=$($_.Count)"
}
throw "Expected $expectedTagCount discovered test tags, found $($tags.Count): $($found -join ', ')"
}
$run = [ordered]@{
schemaVersion = 1
endpoint = $Endpoint
apiKeyEnv = $ApiKeyEnv
machineStart = $MachineStart
machineEnd = $MachineEnd
attributes = $Attributes
eventLimit = $EventLimit
bulkTagCount = $BulkTagCount
drainEveryTags = $DrainEveryTags
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute
parallel = $false
startedAt = (Get-Date).ToUniversalTime().ToString("O")
discoveredTags = $tags
clients = @()
}
$hadFailure = $false
$script:clientFlowIndex = 0
foreach ($client in $Clients) {
$clientResult = Invoke-ClientFlow -Client $client -Tags $tags
if (-not [string]::IsNullOrWhiteSpace($clientResult.error)) {
$hadFailure = $true
}
$run.clients += $clientResult
$script:clientFlowIndex++
}
$run.completedAt = (Get-Date).ToUniversalTime().ToString("O")
$run.success = -not $hadFailure
if (-not $DryRun -or $EmitReport) {
$reportDirectory = Split-Path -Parent $ReportPath
if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) {
New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null
}
$run | ConvertTo-Json -Depth 8 | Set-Content -Path $ReportPath -Encoding UTF8
Write-Host "Wrote e2e report to $ReportPath"
}
if ($hadFailure) {
exit 1
}