<# .SYNOPSIS Cross-language client e2e matrix for the MXAccess Gateway. .DESCRIPTION Drives the .NET, Go, Rust, Python, and Java client CLIs against a running gateway + worker. For each language the script exercises session open/close, register, bulk subscribe/unsubscribe, per-tag add-item/advise, event streaming, a write round-trip with value assertion, error-path (parity) checks, and API-key auth rejection. With -VerifyAlarms it also exercises the session-less stream-alarms and acknowledge-alarm subcommands against the gateway's central alarm monitor. Each client CLI is driven through one long-lived `batch` process: the harness writes one command line to its stdin and reads the JSON result back, so the ~250 operations per client pay the process (and JVM/runtime) cold-start once instead of once per operation. The gateway and worker are assumed to be already running at -Endpoint; the script does not start or stop them. #> [CmdletBinding()] param( [string[]]$Clients = @("dotnet", "go", "rust", "python", "java"), [int]$MachineStart = 1, [int]$MachineEnd = 20, [string[]]$Attributes = @( "ProtectedValue", "TestChangingInt", "TestBoolArray", "TestIntArray", "TestDateTimeArray", "TestStringArray" ), [string]$Endpoint = "localhost:5000", [string]$ApiKeyEnv = "MXGATEWAY_API_KEY", [string]$SqlServer = "localhost", [string]$Database = "ZB", [int]$EventLimit = 5, [int]$BulkTagCount = 6, # The per-tag advise loop advises every discovered tag with no StreamEvents # consumer attached, so MXAccess change events accumulate in the worker # event channel (MxGateway:Events:QueueCapacity). Left unbounded the channel # overflows under FailFast backpressure and faults the worker — slow, # process-per-call clients (the Java CLI) hit this before the loop ends. # Every DrainEveryTags advised tags the loop connects a short-lived # StreamEvents drain so the gateway pumps that channel empty. 0 disables it. [int]$DrainEveryTags = 15, [switch]$SkipStream, [switch]$SkipBulk, # Skip the bulk read+write coverage that runs alongside the existing # subscribe-bulk phase. The read-bulk phase confirms cached-path # semantics against tags left advised by subscribe-bulk (was_cached # = true); the write-bulk phase runs when -VerifyWrite is set and # exercises the BulkWriteResult shape against the writable tag. [switch]$SkipReadWriteBulk, # Write round-trip. Opt-in because it mutates live tag state: it writes a # sentinel value to -WriteAttribute and asserts an OnWriteComplete event # confirms the write reached the MXAccess provider. [switch]$VerifyWrite, [string]$WriteAttribute = "TestChangingInt", [string]$WriteType = "int32", [int]$WriteValueBase = 424200, [int]$WriteEchoMaxEvents = 200, # Alarm feed + acknowledge coverage. Opt-in because it depends on the # gateway's central alarm monitor being enabled (MxGateway:Alarms:Enabled) # and a live alarm provider: stream-alarms reads the monitor's snapshot and # acknowledge-alarm acknowledges -AlarmReference. Both RPCs are session-less # — they exercise the gateway's always-on monitor, not a client session. [switch]$VerifyAlarms, [string]$AlarmReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001", # Messages to read from the central alarm feed. 1 is enough to confirm the # subcommand round-trips: the feed's first message (an active-alarm # snapshot, or snapshot-complete when no alarms are active) always arrives # immediately, whereas later messages depend on live alarm transitions. [int]$AlarmStreamMax = 1, # Error-path (parity) checks. [switch]$SkipParity, # API-key auth rejection checks. [switch]$SkipAuth, # Optional env var holding an API key whose scopes are insufficient for # open-session; when supplied the auth phase also asserts that key is # rejected (PermissionDenied) on top of the always-on missing-key check. [string]$RejectScopeApiKeyEnv, # Run each language client concurrently as an isolated child process. [switch]$Parallel, [switch]$DryRun, [string]$ReportPath, # Internal: set by -Parallel on each spawned child so it always writes its # report (even under -DryRun) for the parent to merge. Not for direct use. [switch]$EmitReport ) Set-StrictMode -Version Latest $ErrorActionPreference = "Stop" $repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..") $discoveryScript = Join-Path $PSScriptRoot "discover-testmachine-tags.ps1" $validClients = @("dotnet", "go", "rust", "python", "java") $Clients = @($Clients | ForEach-Object { $_ -split "," } | ForEach-Object { $_.Trim().ToLowerInvariant() } | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) $Attributes = @($Attributes | ForEach-Object { $_ -split "," } | ForEach-Object { $_.Trim() } | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) if ($Clients.Count -eq 0) { throw "At least one client is required." } if ($Attributes.Count -eq 0) { throw "At least one attribute is required." } if ($BulkTagCount -lt 1) { throw "BulkTagCount must be greater than zero." } if ($DrainEveryTags -lt 0) { throw "DrainEveryTags cannot be negative." } if ($WriteEchoMaxEvents -lt 1) { throw "WriteEchoMaxEvents must be greater than zero." } if ($AlarmStreamMax -lt 1) { throw "AlarmStreamMax must be greater than zero." } foreach ($client in $Clients) { if ($validClients -notcontains $client) { throw "Unsupported client '$client'. Supported clients: $($validClients -join ', ')." } } if ([string]::IsNullOrWhiteSpace($ReportPath)) { $timestamp = Get-Date -Format "yyyyMMdd-HHmmss" $ReportPath = Join-Path $repoRoot "artifacts/e2e/testmachine-client-e2e-$timestamp.json" } function ConvertTo-HttpEndpoint { param([string]$Value) if ($Value.StartsWith("http://", [StringComparison]::OrdinalIgnoreCase) -or $Value.StartsWith("https://", [StringComparison]::OrdinalIgnoreCase)) { return $Value } return "http://$Value" } function ConvertTo-HostEndpoint { param([string]$Value) $hostValue = $Value if ($hostValue.StartsWith("http://", [StringComparison]::OrdinalIgnoreCase)) { $hostValue = $hostValue.Substring(7) } elseif ($hostValue.StartsWith("https://", [StringComparison]::OrdinalIgnoreCase)) { $hostValue = $hostValue.Substring(8) } return $hostValue.TrimEnd("/") } function Join-CommandLine { param( [string]$FilePath, [string[]]$Arguments ) $parts = @($FilePath) + $Arguments return ($parts | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " " } function ConvertTo-NativeArgument { param([string]$Value) if ($Value -notmatch '[\s"]') { return $Value } return '"' + $Value.Replace('\', '\\').Replace('"', '\"') + '"' } function Invoke-NativeCommand { param( [string]$FilePath, [string[]]$Arguments, [string]$WorkingDirectory, [hashtable]$Environment = @{}, # When set, a non-zero exit code is returned to the caller instead of # throwing. Used by the parity and auth phases, which expect failure. [switch]$AllowFailure ) $process = [System.Diagnostics.Process]::new() $process.StartInfo.FileName = $FilePath $process.StartInfo.Arguments = ($Arguments | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " " $process.StartInfo.WorkingDirectory = $WorkingDirectory $process.StartInfo.RedirectStandardOutput = $true $process.StartInfo.RedirectStandardError = $true $process.StartInfo.UseShellExecute = $false foreach ($entry in $Environment.GetEnumerator()) { $process.StartInfo.Environment[$entry.Key] = [string]$entry.Value } $commandLine = Join-CommandLine -FilePath $FilePath -Arguments $Arguments if ($DryRun) { Write-Host "[dry-run] $commandLine" return [pscustomobject]@{ commandLine = $commandLine exitCode = 0 stdout = "{}" stderr = "" } } [void]$process.Start() $stdout = $process.StandardOutput.ReadToEnd() $stderr = $process.StandardError.ReadToEnd() $process.WaitForExit() $result = [pscustomobject]@{ commandLine = $commandLine exitCode = $process.ExitCode stdout = $stdout stderr = $stderr } if ($result.exitCode -ne 0 -and -not $AllowFailure) { throw "Command failed with exit code $($result.exitCode): $commandLine`n$stderr`n$stdout" } return $result } function Read-JsonObject { param([string]$Text) if ([string]::IsNullOrWhiteSpace($Text)) { return $null } try { return $Text | ConvertFrom-Json } catch { $jsonLines = @() foreach ($line in ($Text -split "`r?`n")) { $trimmed = $line.Trim() if ($trimmed.StartsWith("{") -and $trimmed.EndsWith("}")) { $jsonLines += ($trimmed | ConvertFrom-Json) } } return $jsonLines } } function Get-OpenSessionId { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return $Json.sessionId } "go" { return $Json.reply.sessionId } "rust" { return $Json.sessionId } "python" { return $Json.sessionId } "java" { return $Json.reply.sessionId } } } function Get-ServerHandle { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return [int]$Json.register.serverHandle } "go" { return [int]$Json.reply.register.serverHandle } "rust" { return [int]$Json.serverHandle } "python" { return [int]$Json.serverHandle } "java" { return [int]$Json.reply.register.serverHandle } } } function Get-ItemHandle { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return [int]$Json.addItem.itemHandle } "go" { return [int]$Json.reply.addItem.itemHandle } "rust" { return [int]$Json.itemHandle } "python" { return [int]$Json.itemHandle } "java" { return [int]$Json.reply.addItem.itemHandle } } } function Get-StreamEventCount { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return @($Json.events).Count } "go" { return @($Json).Count } "rust" { return [int]$Json.eventCount } "python" { return @($Json.events).Count } "java" { return @($Json).Count } } } # Returns the per-event objects from a stream-events reply so the write # round-trip can inspect their values. Mirrors Get-StreamEventCount: .NET, # Rust, and Python aggregate under `events`; Go and Java emit one event # object per line (Read-JsonObject collapses NDJSON into an array). function Get-StreamEvents { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return @($Json.events) } "go" { return @($Json) } "rust" { return @($Json.events) } "python" { return @($Json.events) } "java" { return @($Json) } } } # Counts the messages in a stream-alarms reply. The CLIs shape the aggregate # JSON differently: .NET nests them under `alarms`, Rust under `messages` with # a `messageCount`, Python under `messages`; Go and Java emit one AlarmFeedMessage # object per line (Read-JsonObject collapses NDJSON into a bare array). function Get-AlarmMessageCount { param( [string]$Client, [object]$Json ) switch ($Client) { "dotnet" { return @($Json.alarms).Count } "go" { return @($Json).Count } "rust" { return [int]$Json.messageCount } "python" { return @($Json.messages).Count } "java" { return @($Json).Count } } } function Get-PropertyValue { param( [object]$Object, [string[]]$Names ) if ($null -eq $Object) { return $null } foreach ($name in $Names) { $property = $Object.PSObject.Properties[$name] if ($null -ne $property) { return $property.Value } } return $null } # Extracts the item handle from a streamed event, tolerating camelCase # (protobuf-JSON) and snake_case (some MessageToDict shapes). function Get-EventItemHandle { param([object]$Event) $handle = Get-PropertyValue -Object $Event -Names @("itemHandle", "item_handle") if ($null -eq $handle) { return $null } return [int]$handle } # Extracts the event family as a string. All five CLIs render it as the # protobuf enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE). function Get-EventFamily { param([object]$Event) return [string](Get-PropertyValue -Object $Event -Names @("family")) } # Extracts the scalar payload from a streamed event's MxValue as a string. # The MxValue oneof renders to one protobuf-JSON `*Value` key; all five # CLIs (after the Rust stream-events extension) emit the same key names. function Get-EventScalar { param([object]$Event) $value = Get-PropertyValue -Object $Event -Names @("value") if ($null -eq $value) { return $null } foreach ($key in @("boolValue", "int32Value", "int64Value", "floatValue", "doubleValue", "stringValue")) { $property = $value.PSObject.Properties[$key] if ($null -ne $property -and $null -ne $property.Value) { return [string]$property.Value } } return $null } # Compares a written value against an observed event scalar. Numeric values # are compared numerically (so 42 matches 42.0); everything else compares as # a trimmed, case-insensitive string. function Test-ValueEquals { param( [string]$Expected, [string]$Observed ) if ([string]::IsNullOrWhiteSpace($Observed)) { return $false } $expectedNumber = 0.0 $observedNumber = 0.0 if ([double]::TryParse($Expected, [ref]$expectedNumber) -and [double]::TryParse($Observed, [ref]$observedNumber)) { return $expectedNumber -eq $observedNumber } return [string]::Equals($Expected.Trim(), $Observed.Trim(), [StringComparison]::OrdinalIgnoreCase) } function Get-BulkResults { param( [string]$Client, [string]$Operation, [object]$Json ) if ($Client -in @("go", "rust", "python", "java")) { return @(Get-PropertyValue -Object $Json -Names @("results")) } # .NET emits the full MxCommandReply via protobuf JSON, with results # nested under a per-command field name. $replyName = switch ($Operation) { "subscribe-bulk" { "subscribeBulk" } "unsubscribe-bulk" { "unsubscribeBulk" } "read-bulk" { "readBulk" } "write-bulk" { "writeBulk" } "write2-bulk" { "write2Bulk" } "write-secured-bulk" { "writeSecuredBulk" } "write-secured2-bulk" { "writeSecured2Bulk" } default { $Operation } } $reply = Get-PropertyValue -Object $Json -Names @($replyName) return @(Get-PropertyValue -Object $reply -Names @("results")) } function Get-BulkItemHandles { param([object[]]$Results) return @($Results | ForEach-Object { [int](Get-PropertyValue -Object $_ -Names @("itemHandle", "item_handle")) } | Where-Object { $_ -gt 0 }) } function Assert-BulkResults { param( [string]$Client, [string]$Operation, [object[]]$Results, [int]$ExpectedCount ) if ($Results.Count -ne $ExpectedCount) { throw "$Client $Operation returned $($Results.Count) result(s); expected $ExpectedCount." } foreach ($result in $Results) { $success = Get-PropertyValue -Object $result -Names @("wasSuccessful", "was_successful") if ($null -ne $success -and -not [bool]$success) { $tagAddress = Get-PropertyValue -Object $result -Names @("tagAddress", "tag_address") $errorMessage = Get-PropertyValue -Object $result -Names @("errorMessage", "error_message") throw "$Client $Operation failed for '$tagAddress': $errorMessage" } } } # Builds the dotnet and Java client CLIs once up front and records the path to # each compiled artifact, so the long-lived `batch` process is launched from # the compiled exe / installed launcher without paying a `dotnet build` or # `gradle` step at flow time. The Go, Rust, and Python batch processes are # launched via `go run` / `cargo run` / `python -m`, which compile-or-start # once when that single per-client process starts. function Initialize-ClientBuilds { if ($Clients -contains "dotnet") { $cliProject = Join-Path $repoRoot "clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/ZB.MOM.WW.MxGateway.Client.Cli.csproj" $script:dotnetCliExe = Join-Path $repoRoot ` "clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/bin/Debug/net10.0/ZB.MOM.WW.MxGateway.Client.Cli.exe" if (-not $DryRun) { Write-Host "Building the .NET client CLI once: $cliProject" Invoke-NativeCommand -FilePath "dotnet" ` -Arguments @("build", $cliProject, "-c", "Debug", "--nologo", "-v", "quiet") ` -WorkingDirectory $repoRoot | Out-Null if (-not (Test-Path $script:dotnetCliExe)) { throw "The .NET client CLI build did not produce '$script:dotnetCliExe'." } } } if ($Clients -contains "java") { $script:javaCliBat = Join-Path $repoRoot ` "clients/java/zb-mom-ww-mxgateway-cli/build/install/zb-mom-ww-mxgateway-cli/bin/zb-mom-ww-mxgateway-cli.bat" if (-not $DryRun) { $gradleCommand = Get-Command "gradle.bat", "gradle.cmd", "gradle.exe", "gradle" ` -ErrorAction SilentlyContinue | Select-Object -First 1 if ($null -eq $gradleCommand) { throw "The 'gradle' command was not found on PATH; the Java client e2e flow requires Gradle." } Write-Host "Installing the Java client CLI once via :zb-mom-ww-mxgateway-cli:installDist" Invoke-NativeCommand -FilePath "cmd.exe" ` -Arguments @("/c", $gradleCommand.Source, "--quiet", ":zb-mom-ww-mxgateway-cli:installDist") ` -WorkingDirectory (Join-Path $repoRoot "clients/java") | Out-Null if (-not (Test-Path $script:javaCliBat)) { throw "The Java client CLI install did not produce '$script:javaCliBat'." } } } } function Get-ClientCommand { param( [string]$Client, [string]$Operation, [hashtable]$Values, # The environment variable the client reads the API key from. Defaults # to the run-wide -ApiKeyEnv; the auth phase overrides it to drive a # missing-key or insufficient-scope rejection. [string]$ApiKeyEnvName = $ApiKeyEnv ) $httpEndpoint = ConvertTo-HttpEndpoint -Value $Endpoint $hostEndpoint = ConvertTo-HostEndpoint -Value $Endpoint $clientName = "mxgw-$Client-e2e" $streamMaxEvents = if ($Values.ContainsKey("maxEvents")) { [int]$Values.maxEvents } else { $EventLimit } # Python's stream-events call ends on a wall-clock timeout; give it enough # headroom to drain a large write-echo budget. $pythonStreamTimeout = [Math]::Max(15, [int][Math]::Ceiling($streamMaxEvents / 4.0)) switch ($Client) { "dotnet" { $arguments = @( $Operation, "--endpoint", $httpEndpoint, "--api-key-env", $ApiKeyEnvName, "--timeout", "60s", "--json" ) if ($Operation -eq "open-session") { $arguments += @("--client-name", $clientName) } elseif ($Operation -eq "register") { $arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName) } elseif ($Operation -eq "add-item") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item) } elseif ($Operation -eq "advise") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)") } elseif ($Operation -eq "subscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) } elseif ($Operation -eq "read-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } } elseif ($Operation -eq "write-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents") } elseif ($Operation -eq "stream-alarms") { $arguments += @("--max-events", "$streamMaxEvents") if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) } } elseif ($Operation -eq "acknowledge-alarm") { $arguments += @("--reference", $Values.alarmReference) if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) } if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) } } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } return [pscustomobject]@{ file = $script:dotnetCliExe; args = $arguments; cwd = $repoRoot; env = @{} } } "go" { $arguments = @( "run", "./cmd/mxgw-go", $Operation, "-endpoint", $hostEndpoint, "-api-key-env", $ApiKeyEnvName, "-plaintext", "-json" ) if ($Operation -eq "open-session") { $arguments += @("-client-session-name", $clientName) } elseif ($Operation -eq "register") { $arguments += @("-session-id", $Values.sessionId, "-client-name", $clientName) } elseif ($Operation -eq "add-item") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item", $Values.item) } elseif ($Operation -eq "advise") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)") } elseif ($Operation -eq "subscribe-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles) } elseif ($Operation -eq "read-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items) if ($Values.ContainsKey("timeoutMs")) { $arguments += @("-timeout-ms", "$($Values.timeoutMs)") } } elseif ($Operation -eq "write-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles, "-type", $Values.valueType, "-values", $Values.values) if ($Values.ContainsKey("userId")) { $arguments += @("-user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value) } elseif ($Operation -eq "stream-events") { $arguments += @("-session-id", $Values.sessionId, "-limit", "$streamMaxEvents") } elseif ($Operation -eq "stream-alarms") { $arguments += @("-limit", "$streamMaxEvents") if ($Values.ContainsKey("filterPrefix")) { $arguments += @("-filter-prefix", $Values.filterPrefix) } } elseif ($Operation -eq "acknowledge-alarm") { $arguments += @("-reference", $Values.alarmReference) if ($Values.ContainsKey("comment")) { $arguments += @("-comment", $Values.comment) } if ($Values.ContainsKey("operator")) { $arguments += @("-operator", $Values.operator) } } elseif ($Operation -eq "close-session") { $arguments += @("-session-id", $Values.sessionId) } return [pscustomobject]@{ file = "go"; args = $arguments; cwd = (Join-Path $repoRoot "clients/go"); env = @{} } } "rust" { $arguments = @( "run", "-p", "mxgw-cli", "--", $Operation, "--endpoint", $httpEndpoint, "--api-key-env", $ApiKeyEnvName, "--json" ) if ($Operation -eq "open-session") { $arguments += @("--client-name", $clientName) } elseif ($Operation -eq "register") { $arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName) } elseif ($Operation -eq "add-item") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item) } elseif ($Operation -eq "advise") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)") } elseif ($Operation -eq "subscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) } elseif ($Operation -eq "read-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } } elseif ($Operation -eq "write-bulk") { # Rust uses --value-type for the type flag. $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles, "--value-type", $Values.valueType, "--values", $Values.values) if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { # Rust names the type flag --value-type, unlike the other CLIs. $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents") } elseif ($Operation -eq "stream-alarms") { $arguments += @("--max-events", "$streamMaxEvents") if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) } } elseif ($Operation -eq "acknowledge-alarm") { $arguments += @("--reference", $Values.alarmReference) if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) } if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) } } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } return [pscustomobject]@{ file = "cargo"; args = $arguments; cwd = (Join-Path $repoRoot "clients/rust"); env = @{} } } "python" { $arguments = @( "-m", "mxgateway_cli", $Operation, "--endpoint", $hostEndpoint, "--api-key-env", $ApiKeyEnvName, "--plaintext", "--json" ) if ($Operation -eq "open-session") { $arguments += @("--client-name", $clientName) } elseif ($Operation -eq "register") { $arguments += @("--session-id", $Values.sessionId, "--client-name", $clientName) } elseif ($Operation -eq "add-item") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item) } elseif ($Operation -eq "advise") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)") } elseif ($Operation -eq "subscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) } elseif ($Operation -eq "read-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) if ($Values.ContainsKey("timeoutMs")) { $arguments += @("--timeout-ms", "$($Values.timeoutMs)") } } elseif ($Operation -eq "write-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) if ($Values.ContainsKey("userId")) { $arguments += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout") } elseif ($Operation -eq "stream-alarms") { $arguments += @("--max-messages", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout") if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) } } elseif ($Operation -eq "acknowledge-alarm") { $arguments += @("--reference", $Values.alarmReference) if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) } if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) } } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } $env = @{ PYTHONPATH = Join-Path $repoRoot "clients/python/src" } return [pscustomobject]@{ file = "py"; args = @("-3.12") + $arguments; cwd = (Join-Path $repoRoot "clients/python"); env = $env } } "java" { $cliArgs = @( $Operation, "--endpoint", $hostEndpoint, "--api-key-env", $ApiKeyEnvName, "--plaintext", "--json" ) if ($Operation -eq "open-session") { $cliArgs += @("--client-session-name", $clientName) } elseif ($Operation -eq "register") { $cliArgs += @("--session-id", $Values.sessionId, "--client-name", $clientName) } elseif ($Operation -eq "add-item") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item", $Values.item) } elseif ($Operation -eq "advise") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)") } elseif ($Operation -eq "subscribe-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) } elseif ($Operation -eq "read-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) if ($Values.ContainsKey("timeoutMs")) { $cliArgs += @("--timeout-ms", "$($Values.timeoutMs)") } } elseif ($Operation -eq "write-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles, "--type", $Values.valueType, "--values", $Values.values) if ($Values.ContainsKey("userId")) { $cliArgs += @("--user-id", "$($Values.userId)") } } elseif ($Operation -eq "write") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { $cliArgs += @("--session-id", $Values.sessionId, "--limit", "$streamMaxEvents") } elseif ($Operation -eq "stream-alarms") { $cliArgs += @("--limit", "$streamMaxEvents") if ($Values.ContainsKey("filterPrefix")) { $cliArgs += @("--filter-prefix", $Values.filterPrefix) } } elseif ($Operation -eq "acknowledge-alarm") { $cliArgs += @("--reference", $Values.alarmReference) if ($Values.ContainsKey("comment")) { $cliArgs += @("--comment", $Values.comment) } if ($Values.ContainsKey("operator")) { $cliArgs += @("--operator", $Values.operator) } } elseif ($Operation -eq "close-session") { $cliArgs += @("--session-id", $Values.sessionId) } # The Java CLI is installed once up front (gradle # :zb-mom-ww-mxgateway-cli:installDist) so each call runs the generated # launcher script directly instead of paying Gradle configuration # plus a JVM cold-start per invocation. .NET's Process.Start # (UseShellExecute=false) cannot launch a .bat directly, so the # launcher runs through cmd.exe. return [pscustomobject]@{ file = "cmd.exe" args = @("/c", $script:javaCliBat) + $cliArgs cwd = (Join-Path $repoRoot "clients/java") env = @{} } } } } # Synthesizes a dry-run JSON reply for an operation so the flow can be # validated without a live gateway. function Get-DryRunReply { param( [string]$Client, [string]$Operation, [hashtable]$Values ) switch ($Operation) { "open-session" { return [pscustomobject]@{ sessionId = "dry-run-session-$Client"; reply = [pscustomobject]@{ sessionId = "dry-run-session-$Client" } } } "register" { return [pscustomobject]@{ serverHandle = 1; register = [pscustomobject]@{ serverHandle = 1 }; reply = [pscustomobject]@{ register = [pscustomobject]@{ serverHandle = 1 } } } } "add-item" { return [pscustomobject]@{ itemHandle = 1; addItem = [pscustomobject]@{ itemHandle = 1 }; reply = [pscustomobject]@{ addItem = [pscustomobject]@{ itemHandle = 1 } } } } "write" { return [pscustomobject]@{ ok = $true; operation = "write"; reply = [pscustomobject]@{} } } "subscribe-bulk" { $results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process { [pscustomobject]@{ itemHandle = $index++; tagAddress = $_; wasSuccessful = $true } }) return [pscustomobject]@{ subscribeBulk = [pscustomobject]@{ results = $results }; results = $results } } "unsubscribe-bulk" { $results = @($Values.itemHandles -split "," | ForEach-Object { [pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true } }) return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results } } "read-bulk" { $results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process { [pscustomobject]@{ itemHandle = $index++ tagAddress = $_ wasSuccessful = $true wasCached = $true } }) return [pscustomobject]@{ readBulk = [pscustomobject]@{ results = $results }; results = $results } } "write-bulk" { $results = @($Values.itemHandles -split "," | ForEach-Object { [pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true } }) return [pscustomobject]@{ writeBulk = [pscustomobject]@{ results = $results }; results = $results } } "stream-events" { # Synthesize an OnDataChange (carrying the written value) and an # OnWriteComplete so the write round-trip assertion passes under # -DryRun. The reply is shaped per client: Go and Java emit one # event object per line (Read-JsonObject collapses NDJSON to a # bare array), the others aggregate the events under `events`. $itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 } $echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 } $dataEvent = [pscustomobject]@{ workerSequence = 1 family = "MX_EVENT_FAMILY_ON_DATA_CHANGE" itemHandle = $itemHandle value = [pscustomobject]@{ int32Value = $echoValue } } $writeCompleteEvent = [pscustomobject]@{ workerSequence = 2 family = "MX_EVENT_FAMILY_ON_WRITE_COMPLETE" itemHandle = $itemHandle } $events = @($dataEvent, $writeCompleteEvent) switch ($Client) { "go" { return ,$events } "java" { return ,$events } "rust" { return [pscustomobject]@{ eventCount = $events.Count; events = $events } } default { return [pscustomobject]@{ events = $events } } } } "stream-alarms" { # Synthesize an active-alarm snapshot followed by the # snapshot-complete sentinel. The reply is shaped per client: # Go and Java emit one message object per line (Read-JsonObject # collapses NDJSON to a bare array), Rust aggregates under # `messages` with a `messageCount`, Python under `messages`, and # .NET under `alarms`. $activeAlarm = [pscustomobject]@{ activeAlarm = [pscustomobject]@{ alarmFullReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001" currentState = "ALARM_CONDITION_STATE_ACTIVE" severity = 500 } } $snapshotComplete = [pscustomobject]@{ snapshotComplete = $true } $messages = @($activeAlarm, $snapshotComplete) switch ($Client) { "go" { return ,$messages } "java" { return ,$messages } "rust" { return [pscustomobject]@{ messageCount = $messages.Count; messages = $messages } } "dotnet" { return [pscustomobject]@{ alarms = $messages } } default { return [pscustomobject]@{ messages = $messages } } } } "acknowledge-alarm" { return [pscustomobject]@{ rawReply = [pscustomobject]@{ hresult = 0; diagnosticMessage = "dry-run ack" } reply = [pscustomobject]@{ hresult = 0 } } } default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } } } } # --- Batch-mode client process --------------------------------------------- # The e2e flow issues ~250 operations per client. Spawning one CLI process per # operation pays a process — and, for the JVM, a runtime — cold-start every # time. Instead each client CLI exposes a `batch` subcommand: a single # long-lived process that reads one command line from stdin, runs it, writes # the JSON result, then a line containing exactly $batchTerminator. The harness # drives that one process per client, so startup is paid once. $script:batchTerminator = "__MXGW_BATCH_EOR__" $script:currentBatchClient = $null # A redirected child's StandardInput writer is created with Console.InputEncoding, # which is UTF-8 *with a BOM* on this host. The writer then prepends that BOM to # the first bytes it sends, and the CLIs parse it into their first argument. # Switching the console input encoding to a BOM-less encoding before any batch # process starts makes that writer BOM-free. The e2e command lines are ASCII. try { [Console]::InputEncoding = [System.Text.Encoding]::ASCII } catch { Write-Warning "Could not set a BOM-less console input encoding: $($_.Exception.Message)" } # Derives the `batch`-process launch spec from Get-ClientCommand: the launch # prefix is whatever precedes the operation token (e.g. `run -p mxgw-cli --`), # with the operation itself replaced by `batch`. function Get-BatchLaunchSpec { param([string]$Client) $command = Get-ClientCommand -Client $Client -Operation "open-session" -Values @{} -ApiKeyEnvName $ApiKeyEnv $argList = [object[]]$command.args $operationIndex = [Array]::IndexOf($argList, "open-session") if ($operationIndex -lt 0) { throw "Cannot locate the operation token in the '$Client' command line." } $prefix = if ($operationIndex -gt 0) { @($argList[0..($operationIndex - 1)]) } else { @() } return [pscustomobject]@{ file = $command.file args = @($prefix + "batch") cwd = $command.cwd env = $command.env } } # Returns just the operation arguments (operation token + flags) for a client # command, stripping the launch prefix — this is the line written to the batch # process for one operation. function Get-ClientOperationArgs { param( [string]$Client, [string]$Operation, [hashtable]$Values, [string]$ApiKeyEnvName = $ApiKeyEnv ) $command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName $argList = [object[]]$command.args $operationIndex = [Array]::IndexOf($argList, $Operation) if ($operationIndex -le 0) { return @($argList) } return @($argList[$operationIndex..($argList.Count - 1)]) } # True when a parsed command reply is the CLI's failure envelope rather than a # normal result. All five CLIs emit a top-level `error` field on failure. function Test-OperationFailed { param([object]$Json) if ($null -eq $Json) { return $true } $errorValue = Get-PropertyValue -Object $Json -Names @("error") return -not [string]::IsNullOrEmpty([string]$errorValue) } # Starts the long-lived `batch` process for a client and returns a handle # carrying the process and its redirected stdin/stdout streams. function Start-BatchClient { param([string]$Client) $spec = Get-BatchLaunchSpec -Client $Client $startInfo = [System.Diagnostics.ProcessStartInfo]::new() $startInfo.FileName = $spec.file $startInfo.Arguments = ($spec.args | ForEach-Object { ConvertTo-NativeArgument -Value $_ }) -join " " $startInfo.WorkingDirectory = $spec.cwd $startInfo.RedirectStandardInput = $true $startInfo.RedirectStandardOutput = $true # stderr is left attached to the console: the CLIs only log diagnostics # there, and not redirecting it removes any risk of the child blocking on a # full stderr pipe while the harness reads stdout. $startInfo.RedirectStandardError = $false $startInfo.UseShellExecute = $false foreach ($entry in $spec.env.GetEnumerator()) { $startInfo.Environment[$entry.Key] = [string]$entry.Value } $process = [System.Diagnostics.Process]::new() $process.StartInfo = $startInfo [void]$process.Start() return [pscustomobject]@{ client = $Client; process = $process; input = $process.StandardInput } } # Sends one operation to a batch process and returns its raw JSON output text # (everything written before the terminator line). function Invoke-BatchOperation { param( [pscustomobject]$BatchClient, [string]$Client, [string]$Operation, [hashtable]$Values, [string]$ApiKeyEnvName ) $operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation ` -Values $Values -ApiKeyEnvName $ApiKeyEnvName $process = $BatchClient.process $BatchClient.input.WriteLine(($operationArgs -join " ")) $BatchClient.input.Flush() $builder = [System.Text.StringBuilder]::new() while ($true) { $line = $process.StandardOutput.ReadLine() if ($null -eq $line) { throw ("Batch client '$Client' closed its output before terminating operation " + "'$Operation' (process exited: $($process.HasExited)).") } if ($line -eq $script:batchTerminator) { break } [void]$builder.AppendLine($line) } return $builder.ToString() } # Signals end-of-input to a batch process and waits for it to exit. function Stop-BatchClient { param([pscustomobject]$BatchClient) if ($null -eq $BatchClient) { return } $process = $BatchClient.process try { if (-not $process.HasExited) { $BatchClient.input.Close() if (-not $process.WaitForExit(15000)) { $process.Kill($true) } } } catch { try { $process.Kill($true) } catch { } } finally { $process.Dispose() } } function Invoke-ClientOperation { param( [string]$Client, [string]$Operation, [hashtable]$Values = @{}, [string]$ApiKeyEnvName = $ApiKeyEnv ) if ($DryRun) { $operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation ` -Values $Values -ApiKeyEnvName $ApiKeyEnvName Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')" return Get-DryRunReply -Client $Client -Operation $Operation -Values $Values } $stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client ` -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName $json = Read-JsonObject -Text $stdout if (Test-OperationFailed -Json $json) { $errorValue = Get-PropertyValue -Object $json -Names @("error") throw "$Client $Operation failed: $errorValue" } return $json } # Runs a client operation that is expected to fail. Returns a record whose # `failed` flag is true when the CLI reported its failure envelope. Under # -DryRun a synthetic failure is returned so the parity and auth phases can be # exercised offline. function Invoke-ClientOperationExpectingFailure { param( [string]$Client, [string]$Operation, [hashtable]$Values = @{}, [string]$ApiKeyEnvName = $ApiKeyEnv ) if ($DryRun) { $operationArgs = Get-ClientOperationArgs -Client $Client -Operation $Operation ` -Values $Values -ApiKeyEnvName $ApiKeyEnvName Write-Host "[dry-run] (batch:$Client) $($operationArgs -join ' ')" return [pscustomobject]@{ failed = $true; json = $null } } $stdout = Invoke-BatchOperation -BatchClient $script:currentBatchClient -Client $Client ` -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName $json = Read-JsonObject -Text $stdout return [pscustomobject]@{ failed = (Test-OperationFailed -Json $json); json = $json } } # Connects a short-lived StreamEvents consumer so the gateway empties the worker # event channel. The per-tag advise loop advises every discovered tag with no # consumer attached; without periodic draining the worker event channel # (MxGateway:Events:QueueCapacity) overflows under FailFast backpressure and # faults the worker. # # A small bounded read is enough: the gateway's per-stream producer # (EventStreamService.ProduceEventsAsync) races ahead of the CLI and pulls the # entire worker event channel into its own buffer the instant a subscriber # attaches, so the channel is emptied long before the CLI finishes reading # these events. Run via the expecting-failure path so the drain's exit code is # ignored — its purpose is the side effect (emptying the channel), not output. function Invoke-EventDrain { param( [string]$Client, [string]$SessionId ) Invoke-ClientOperationExpectingFailure -Client $Client -Operation "stream-events" -Values @{ sessionId = $SessionId maxEvents = 200 } | Out-Null } # Runs the full e2e flow for a single language client and returns the result # record. Discovered tags are passed in so the (slow) SQL discovery runs once. function Invoke-ClientFlow { param( [string]$Client, [object[]]$Tags ) Write-Host "Running $Client client e2e flow against $($Tags.Count) discovered tags." $sessionId = $null $serverHandle = $null $clientResult = [ordered]@{ language = $Client sessionId = $null serverHandle = $null bulk = $null addedItems = @() eventCount = 0 write = $null alarms = $null parity = @() auth = @() closed = $false error = $null } try { if (-not $DryRun) { $script:currentBatchClient = Start-BatchClient -Client $Client } $openJson = Invoke-ClientOperation -Client $Client -Operation "open-session" $sessionId = Get-OpenSessionId -Client $Client -Json $openJson if ([string]::IsNullOrWhiteSpace($sessionId)) { throw "The $Client open-session command did not return a session id." } $clientResult.sessionId = $sessionId $registerJson = Invoke-ClientOperation -Client $Client -Operation "register" -Values @{ sessionId = $sessionId } $serverHandle = Get-ServerHandle -Client $Client -Json $registerJson $clientResult.serverHandle = $serverHandle # --- Write round-trip + value assertion --------------------------- # Runs right after register, before the bulk and add-item phases, so # only a small backlog of events precedes the write. The gateway # replays the per-session event buffer from the start, so the # post-write OnWriteComplete must be reachable within the bounded # -WriteEchoMaxEvents window. if ($VerifyWrite) { $writeTag = @($Tags | Where-Object { $_.attributeName -eq $WriteAttribute }) | Select-Object -First 1 if ($null -eq $writeTag) { Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'." } else { $writeAddJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{ sessionId = $sessionId serverHandle = $serverHandle item = $writeTag.fullTagReference } $writeItemHandle = Get-ItemHandle -Client $Client -Json $writeAddJson Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{ sessionId = $sessionId serverHandle = $serverHandle itemHandle = $writeItemHandle } | Out-Null $sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)" Invoke-ClientOperation -Client $Client -Operation "write" -Values @{ sessionId = $sessionId serverHandle = $serverHandle itemHandle = $writeItemHandle valueType = $WriteType value = $sentinelValue } | Out-Null $writeStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{ sessionId = $sessionId maxEvents = $WriteEchoMaxEvents echoItemHandle = $writeItemHandle echoValue = $sentinelValue } $writeEvents = @(Get-StreamEvents -Client $Client -Json $writeStreamJson) $writeItemEvents = @($writeEvents | Where-Object { (Get-EventItemHandle -Event $_) -eq $writeItemHandle }) # The reliable write round-trip signal: MXAccess fires # OnWriteComplete once the write reaches the provider. The # value echo is best-effort — a provider-driven attribute # (e.g. a simulated counter) accepts the write but does not # hold the value, so no OnDataChange carries it back. $writeCompleteEvent = $writeItemEvents | Where-Object { (Get-EventFamily -Event $_) -match "WRITE_COMPLETE" } | Select-Object -First 1 $echoEvent = $writeItemEvents | Where-Object { Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_) } | Select-Object -First 1 if ($null -eq $writeCompleteEvent) { throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " + "'$($writeTag.fullTagReference)' (item handle $writeItemHandle) but no " + "OnWriteComplete event was observed in $($writeEvents.Count) streamed event(s). " + "Increase -WriteEchoMaxEvents, or drop -VerifyWrite.") } $clientResult.write = [ordered]@{ attributeName = $WriteAttribute fullTagReference = $writeTag.fullTagReference itemHandle = $writeItemHandle valueType = $WriteType value = $sentinelValue writeCompleteObserved = $true echoObserved = ($null -ne $echoEvent) } # WriteBulk smoke: single-entry batch against the same writable # tag. Exercises the BulkWriteResult wire format end-to-end # without complicating the OnWriteComplete echo assertion that # the single-item write phase already verified above. Pinned # to a different sentinel value so a subsequent read-bulk # against the same tag would see the bulk write's effect. if (-not $SkipReadWriteBulk) { $bulkSentinel = $sentinelValue + 1 $writeBulkJson = Invoke-ClientOperation -Client $Client -Operation "write-bulk" -Values @{ sessionId = $sessionId serverHandle = $serverHandle itemHandles = "$writeItemHandle" valueType = $WriteType values = "$bulkSentinel" userId = 0 } $writeBulkResults = @(Get-BulkResults -Client $Client -Operation "write-bulk" -Json $writeBulkJson) Assert-BulkResults -Client $Client -Operation "write-bulk" -Results $writeBulkResults -ExpectedCount 1 $clientResult.write.writeBulkValue = $bulkSentinel $clientResult.write.writeBulkResultCount = $writeBulkResults.Count } } } if (-not $SkipBulk) { $bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count))) $bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join "," $subscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "subscribe-bulk" -Values @{ sessionId = $sessionId serverHandle = $serverHandle items = $bulkItems } $subscribeResults = @(Get-BulkResults -Client $Client -Operation "subscribe-bulk" -Json $subscribeBulkJson) Assert-BulkResults -Client $Client -Operation "subscribe-bulk" -Results $subscribeResults -ExpectedCount $bulkTags.Count $bulkItemHandles = @(Get-BulkItemHandles -Results $subscribeResults) if ($bulkItemHandles.Count -ne $bulkTags.Count) { throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)." } # ReadBulk over the already-advised tags: every result must come # from the per-session value cache (was_cached = true). Confirms # the gateway/worker/cache wiring serves cached values for tags # the caller did not create the subscription for. $readBulkSummary = $null if (-not $SkipReadWriteBulk) { $readBulkJson = Invoke-ClientOperation -Client $Client -Operation "read-bulk" -Values @{ sessionId = $sessionId serverHandle = $serverHandle items = $bulkItems timeoutMs = 1500 } $readResults = @(Get-BulkResults -Client $Client -Operation "read-bulk" -Json $readBulkJson) Assert-BulkResults -Client $Client -Operation "read-bulk" -Results $readResults -ExpectedCount $bulkTags.Count $cachedCount = @($readResults | Where-Object { [bool](Get-PropertyValue -Object $_ -Names @("wasCached", "was_cached")) }).Count # Allow up to one snapshot fallback per batch: a freshly # advised tag may not have an OnDataChange cached yet if it # hasn't pushed an update in the small window between # subscribe-bulk and read-bulk. Anything beyond that means # the cached-path optimization is broken. $maxSnapshotFallbacks = 1 if ($cachedCount -lt ($readResults.Count - $maxSnapshotFallbacks)) { throw ("$Client read-bulk only returned $cachedCount cached result(s) " + "out of $($readResults.Count); the cache-then-snapshot fork must " + "serve cached values for already-advised tags.") } $readBulkSummary = [ordered]@{ tagCount = $readResults.Count cachedCount = $cachedCount } } $unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{ sessionId = $sessionId serverHandle = $serverHandle itemHandles = $bulkItemHandles -join "," } $unsubscribeResults = @(Get-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Json $unsubscribeBulkJson) Assert-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Results $unsubscribeResults -ExpectedCount $bulkItemHandles.Count $clientResult.bulk = [ordered]@{ tagCount = $bulkTags.Count subscribedCount = $subscribeResults.Count unsubscribedCount = $unsubscribeResults.Count itemHandles = $bulkItemHandles readBulk = $readBulkSummary } } $advisedSinceDrain = 0 foreach ($tag in $Tags) { $addJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{ sessionId = $sessionId serverHandle = $serverHandle item = $tag.fullTagReference } $itemHandle = Get-ItemHandle -Client $Client -Json $addJson Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{ sessionId = $sessionId serverHandle = $serverHandle itemHandle = $itemHandle } | Out-Null $clientResult.addedItems += [ordered]@{ tagName = $tag.tagName attributeName = $tag.attributeName fullTagReference = $tag.fullTagReference itemHandle = $itemHandle protectedWriteRequired = $tag.attributeName -eq "ProtectedValue" } # Drain the worker event channel every DrainEveryTags advised tags # so this unbounded advise loop cannot overflow it and fault the # worker before the loop completes. $advisedSinceDrain++ if ($DrainEveryTags -gt 0 -and $advisedSinceDrain -ge $DrainEveryTags) { Invoke-EventDrain -Client $Client -SessionId $sessionId $advisedSinceDrain = 0 } } # --- Event streaming ---------------------------------------------- if (-not $SkipStream) { $streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{ sessionId = $sessionId } $clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson if ($clientResult.eventCount -lt 1) { throw "The $Client stream-events command returned no events." } } # --- Alarm feed + acknowledge ------------------------------------- # Session-less RPCs against the gateway's always-on central alarm # monitor. Opt-in (-VerifyAlarms) because it needs the monitor enabled # (MxGateway:Alarms:Enabled) and a live alarm provider. if ($VerifyAlarms) { $alarmStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-alarms" -Values @{ maxEvents = $AlarmStreamMax } $alarmMessageCount = Get-AlarmMessageCount -Client $Client -Json $alarmStreamJson if ($alarmMessageCount -lt 1) { throw "The $Client stream-alarms command returned no alarm-feed messages." } # The acknowledge round-trips against the central monitor; the # native ack outcome depends on whether the referenced alarm is # currently active, so only the RPC's success is asserted here. Invoke-ClientOperation -Client $Client -Operation "acknowledge-alarm" -Values @{ alarmReference = $AlarmReference comment = "e2e-matrix" operator = "mxgw-e2e" } | Out-Null $clientResult.alarms = [ordered]@{ streamMessageCount = $alarmMessageCount acknowledgeReference = $AlarmReference acknowledged = $true } } # --- Error-path (parity) checks ----------------------------------- # MXAccess parity: an invalid item handle and an unknown session must # both be rejected rather than silently succeeding. if (-not $SkipParity) { $parityChecks = @( [ordered]@{ check = "invalid-item-handle" operation = "advise" values = @{ sessionId = $sessionId; serverHandle = $serverHandle; itemHandle = 2147483647 } }, [ordered]@{ check = "unknown-session" operation = "register" values = @{ sessionId = [guid]::NewGuid().ToString() } } ) foreach ($parityCheck in $parityChecks) { $parityResult = Invoke-ClientOperationExpectingFailure ` -Client $Client -Operation $parityCheck.operation -Values $parityCheck.values $passed = [bool]$parityResult.failed $clientResult.parity += [ordered]@{ check = $parityCheck.check operation = $parityCheck.operation passed = $passed } if (-not $passed) { throw "$Client parity check '$($parityCheck.check)' expected $($parityCheck.operation) to fail but it exited 0." } } } # --- API-key auth rejection --------------------------------------- # Runs after a working session is established, so a non-zero exit is # an auth rejection rather than the gateway being unreachable. if (-not $SkipAuth) { $authChecks = @( [ordered]@{ check = "missing-api-key"; apiKeyEnv = $script:missingKeyEnvName } ) if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) { $authChecks += [ordered]@{ check = "insufficient-scope"; apiKeyEnv = $RejectScopeApiKeyEnv } } foreach ($authCheck in $authChecks) { $authResult = Invoke-ClientOperationExpectingFailure ` -Client $Client -Operation "open-session" -ApiKeyEnvName $authCheck.apiKeyEnv $passed = [bool]$authResult.failed $clientResult.auth += [ordered]@{ check = $authCheck.check passed = $passed } if (-not $passed) { throw "$Client auth check '$($authCheck.check)' expected open-session to be rejected but it exited 0." } } } } catch { $clientResult.error = $_.Exception.Message Write-Warning "$Client e2e flow failed: $($clientResult.error)" } finally { if (-not [string]::IsNullOrWhiteSpace($sessionId)) { try { Invoke-ClientOperation -Client $Client -Operation "close-session" -Values @{ sessionId = $sessionId } | Out-Null $clientResult.closed = $true } catch { $clientResult.error = "$($clientResult.error) close-session failed: $($_.Exception.Message)" } } if ($null -ne $script:currentBatchClient) { Stop-BatchClient -BatchClient $script:currentBatchClient $script:currentBatchClient = $null } } return $clientResult } # Forwards every run parameter to a single-client child invocation used by # -Parallel. -Parallel itself is intentionally omitted so the child runs the # serial path. function Get-ChildArgumentList { param( [string]$Client, [string]$ChildReportPath ) $childArgs = @( "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", $PSCommandPath, "-Clients", $Client, "-MachineStart", "$MachineStart", "-MachineEnd", "$MachineEnd", "-Attributes", ($Attributes -join ","), "-Endpoint", $Endpoint, "-ApiKeyEnv", $ApiKeyEnv, "-SqlServer", $SqlServer, "-Database", $Database, "-EventLimit", "$EventLimit", "-BulkTagCount", "$BulkTagCount", "-DrainEveryTags", "$DrainEveryTags", "-WriteAttribute", $WriteAttribute, "-WriteType", $WriteType, "-WriteValueBase", "$WriteValueBase", "-WriteEchoMaxEvents", "$WriteEchoMaxEvents", "-AlarmReference", $AlarmReference, "-AlarmStreamMax", "$AlarmStreamMax", "-ReportPath", $ChildReportPath, "-EmitReport" ) if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) { $childArgs += @("-RejectScopeApiKeyEnv", $RejectScopeApiKeyEnv) } if ($SkipStream) { $childArgs += "-SkipStream" } if ($SkipBulk) { $childArgs += "-SkipBulk" } if ($VerifyWrite) { $childArgs += "-VerifyWrite" } if ($VerifyAlarms) { $childArgs += "-VerifyAlarms" } if ($SkipParity) { $childArgs += "-SkipParity" } if ($SkipAuth) { $childArgs += "-SkipAuth" } if ($DryRun) { $childArgs += "-DryRun" } return $childArgs } # An env var name that is guaranteed not to be set in this process, used to # drive the missing-API-key auth rejection. $script:missingKeyEnvName = "MXGW_E2E_MISSING_KEY_" + ([guid]::NewGuid().ToString("N")) # --- Parallel mode: fan out one isolated child process per client ---------- if ($Parallel -and $Clients.Count -gt 1) { Write-Host "Running $($Clients.Count) client e2e flows in parallel." $childRoot = Join-Path ([System.IO.Path]::GetTempPath()) ("mxgw-e2e-" + ([guid]::NewGuid().ToString("N"))) New-Item -ItemType Directory -Path $childRoot -Force | Out-Null $children = @() foreach ($client in $Clients) { $childReport = Join-Path $childRoot "$client.json" $childLog = Join-Path $childRoot "$client.log" $childArgs = Get-ChildArgumentList -Client $client -ChildReportPath $childReport $process = Start-Process -FilePath "pwsh" -ArgumentList $childArgs ` -RedirectStandardOutput $childLog -RedirectStandardError "$childLog.err" ` -NoNewWindow -PassThru $children += [pscustomobject]@{ client = $client process = $process report = $childReport log = $childLog } } foreach ($child in $children) { $child.process.WaitForExit() } $mergedClients = @() $discoveredTags = @() $hadFailure = $false foreach ($child in $children) { foreach ($logPath in @($child.log, "$($child.log).err")) { if ((Test-Path $logPath) -and -not [string]::IsNullOrWhiteSpace((Get-Content -Raw -Path $logPath))) { Write-Host "----- $($child.client) -----" Get-Content -Path $logPath | ForEach-Object { Write-Host $_ } } } if ($child.process.ExitCode -ne 0) { $hadFailure = $true } if (Test-Path $child.report) { $childRun = Get-Content -Raw -Path $child.report | ConvertFrom-Json $mergedClients += @($childRun.clients) if ($discoveredTags.Count -eq 0) { $discoveredTags = @($childRun.discoveredTags) } } else { $hadFailure = $true $mergedClients += [pscustomobject]@{ language = $child.client error = "Child process exited $($child.process.ExitCode) without writing a report." } } } $run = [ordered]@{ schemaVersion = 1 endpoint = $Endpoint apiKeyEnv = $ApiKeyEnv machineStart = $MachineStart machineEnd = $MachineEnd attributes = $Attributes eventLimit = $EventLimit bulkTagCount = $BulkTagCount drainEveryTags = $DrainEveryTags skipStream = [bool]$SkipStream skipBulk = [bool]$SkipBulk verifyWrite = [bool]$VerifyWrite verifyAlarms = [bool]$VerifyAlarms skipParity = [bool]$SkipParity skipAuth = [bool]$SkipAuth writeAttribute = $WriteAttribute parallel = $true discoveredTags = $discoveredTags clients = $mergedClients completedAt = (Get-Date).ToUniversalTime().ToString("O") success = -not $hadFailure } $reportDirectory = Split-Path -Parent $ReportPath if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) { New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null } $run | ConvertTo-Json -Depth 8 | Set-Content -Path $ReportPath -Encoding UTF8 Write-Host "Wrote merged e2e report to $ReportPath" Remove-Item -Path $childRoot -Recurse -Force -ErrorAction SilentlyContinue if ($hadFailure) { exit 1 } return } # --- Serial mode ----------------------------------------------------------- Initialize-ClientBuilds $discoveryJson = & $discoveryScript ` -MachineStart $MachineStart ` -MachineEnd $MachineEnd ` -Attributes $Attributes ` -SqlServer $SqlServer ` -Database $Database ` -Json $convertedTags = $discoveryJson | ConvertFrom-Json $tags = @($convertedTags) if ($tags.Count -eq 1 -and $tags[0] -is [System.Array]) { $tags = @($tags[0]) } $expectedTagCount = (($MachineEnd - $MachineStart + 1) * $Attributes.Count) if ($tags.Count -ne $expectedTagCount) { $found = $tags | Group-Object -Property tagName | ForEach-Object { "$($_.Name)=$($_.Count)" } throw "Expected $expectedTagCount discovered test tags, found $($tags.Count): $($found -join ', ')" } $run = [ordered]@{ schemaVersion = 1 endpoint = $Endpoint apiKeyEnv = $ApiKeyEnv machineStart = $MachineStart machineEnd = $MachineEnd attributes = $Attributes eventLimit = $EventLimit bulkTagCount = $BulkTagCount drainEveryTags = $DrainEveryTags skipStream = [bool]$SkipStream skipBulk = [bool]$SkipBulk verifyWrite = [bool]$VerifyWrite verifyAlarms = [bool]$VerifyAlarms skipParity = [bool]$SkipParity skipAuth = [bool]$SkipAuth writeAttribute = $WriteAttribute parallel = $false startedAt = (Get-Date).ToUniversalTime().ToString("O") discoveredTags = $tags clients = @() } $hadFailure = $false $script:clientFlowIndex = 0 foreach ($client in $Clients) { $clientResult = Invoke-ClientFlow -Client $client -Tags $tags if (-not [string]::IsNullOrWhiteSpace($clientResult.error)) { $hadFailure = $true } $run.clients += $clientResult $script:clientFlowIndex++ } $run.completedAt = (Get-Date).ToUniversalTime().ToString("O") $run.success = -not $hadFailure if (-not $DryRun -or $EmitReport) { $reportDirectory = Split-Path -Parent $ReportPath if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) { New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null } $run | ConvertTo-Json -Depth 8 | Set-Content -Path $ReportPath -Encoding UTF8 Write-Host "Wrote e2e report to $ReportPath" } if ($hadFailure) { exit 1 }