diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 994c91b..6f71836 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -17,12 +17,12 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use mxgateway_client::generated::mxaccess_gateway::v1::{ - CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest, - PingCommand, StreamEventsRequest, + CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, + MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, }; use mxgateway_client::{ - ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION, - GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, + ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection, + CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, }; use serde_json::json; use serde_json::Value; @@ -451,7 +451,7 @@ async fn run(cli: Cli) -> Result<(), Error> { after_worker_sequence, }) .await?; - let mut events = Vec::new(); + let mut events: Vec = Vec::new(); let mut event_count = 0usize; while event_count < max_events { let Some(event) = stream.next().await else { @@ -460,21 +460,17 @@ async fn run(cli: Cli) -> Result<(), Error> { let event = event?; event_count += 1; if jsonl { - println!( - "{}", - json!({ - "workerSequence": event.worker_sequence, - "family": event.family, - }) - ); + println!("{}", event_to_json(&event)); } else if json { - events.push(event); + events.push(event_to_json(&event)); } else { println!("{} {}", event.worker_sequence, event.family); } } if json { - println!("{}", json!({ "eventCount": event_count })); + // `eventCount` is preserved for back-compat; `events` carries + // the per-event detail the cross-language e2e matrix compares. + println!("{}", json!({ "eventCount": event_count, "events": events })); } } Command::Write { @@ -841,6 +837,44 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) { } } +/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is +/// projected into protojson-style `*Value` keys so the cross-language e2e +/// matrix can extract and compare event values uniformly across all five +/// client CLIs. +fn event_to_json(event: &MxEvent) -> Value { + json!({ + "family": event.family, + "sessionId": event.session_id, + "serverHandle": event.server_handle, + "itemHandle": event.item_handle, + "quality": event.quality, + "workerSequence": event.worker_sequence, + "value": event.value.as_ref().map(event_value_to_json), + }) +} + +/// Project an [`MxValue`] into a protojson-shaped JSON object whose single +/// key names the scalar kind (`int32Value`, `stringValue`, ...), matching +/// the protobuf-JSON the .NET/Go/Java CLIs emit. +fn event_value_to_json(value: &ProtoMxValue) -> Value { + match MxValue::from_proto(value.clone()).projection() { + MxValueProjection::Bool(inner) => json!({ "boolValue": inner }), + MxValueProjection::Int32(inner) => json!({ "int32Value": inner }), + // protojson renders 64-bit integers as strings; mirror that here. + MxValueProjection::Int64(inner) => json!({ "int64Value": inner.to_string() }), + MxValueProjection::Float(inner) => json!({ "floatValue": inner }), + MxValueProjection::Double(inner) => json!({ "doubleValue": inner }), + MxValueProjection::String(inner) => json!({ "stringValue": inner }), + MxValueProjection::Timestamp(ts) => { + json!({ "timestampValue": { "seconds": ts.seconds, "nanos": ts.nanos } }) + } + MxValueProjection::Array(_) => json!({ "arrayValue": {} }), + MxValueProjection::Raw(bytes) => json!({ "rawValue": { "byteCount": bytes.len() } }), + MxValueProjection::Null => json!({ "isNull": true }), + MxValueProjection::Unset => Value::Null, + } +} + /// Parse a small but practically-complete subset of RFC3339: /// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the /// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds). diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md index 1c53808..3768f4f 100644 --- a/docs/GatewayTesting.md +++ b/docs/GatewayTesting.md @@ -171,11 +171,28 @@ powershell -ExecutionPolicy Bypass -File scripts/discover-testmachine-tags.ps1 - ``` `scripts/run-client-e2e-tests.ps1` drives the .NET, Go, Rust, Python, and Java -client CLIs through a live gateway session. For each client it opens one -session, registers, verifies `SubscribeBulk` and `UnsubscribeBulk` on a bounded -tag subset, adds and advises every discovered test tag, reads a bounded event -stream, then closes the session in a `finally` path. The script writes a JSON -report under `artifacts/e2e/`. +client CLIs through a live gateway session. The gateway and worker are assumed +to be already running at `-Endpoint`; the script does not start or stop them. +For each client it runs these phases, then closes the session in a `finally` +path and writes a JSON report under `artifacts/e2e/`: + +1. **Session + register** — opens one session and registers. +2. **Bulk** — verifies `SubscribeBulk` / `UnsubscribeBulk` on a bounded tag + subset (skip with `-SkipBulk`). +3. **Add-item / advise** — adds and advises every discovered test tag. +4. **Write round-trip** — writes a per-client sentinel value to a configurable + writable attribute (`-WriteAttribute`, default `TestChangingInt`), then + asserts the same value is echoed back through the event stream. Skip with + `-SkipWrite`. The Rust `stream-events` CLI emits full per-event JSON + (`itemHandle` + `value`) so all five clients run an identical value compare. +5. **Stream** — asserts a bounded event stream delivers at least one event + (skip with `-SkipStream`). +6. **Parity** — asserts MXAccess error paths are rejected rather than silently + succeeding: an invalid item handle and an unknown session id (skip with + `-SkipParity`). +7. **Auth rejection** — asserts `open-session` is rejected when the API key is + missing, and (when `-RejectScopeApiKeyEnv` names an insufficient-scope key) + when the key lacks the required scope. Skip with `-SkipAuth`. Build the gateway and worker, start the gateway, and provide a valid API key before running the client e2e script: @@ -192,9 +209,24 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Clien powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -BulkTagCount 10 powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipStream powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipBulk +# Write round-trip: point at a writable scalar attribute and its value type. +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -WriteAttribute TestChangingInt -WriteType int32 +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipWrite +# Auth rejection: also assert an insufficient-scope key is denied. +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY +# Run all five clients concurrently as isolated child processes. +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Parallel +# Validate the flow offline (prints commands, contacts no gateway). +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -DryRun powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Endpoint localhost:5000 -ApiKeyEnv MXGATEWAY_API_KEY ``` +The write round-trip fails loudly if `-WriteAttribute` does not name a writable +scalar attribute, if the write is rejected, or if the sentinel value is not +observed within `-WriteEchoMaxEvents` (default 200) streamed events. Point +`-WriteAttribute` at a stable writable attribute, raise `-WriteEchoMaxEvents`, +or pass `-SkipWrite` if no suitable attribute is deployed. + ## Focused Commands Run the cross-language smoke matrix tests after changing the documented client diff --git a/scripts/run-client-e2e-tests.ps1 b/scripts/run-client-e2e-tests.ps1 index a5f2865..47274f8 100644 --- a/scripts/run-client-e2e-tests.ps1 +++ b/scripts/run-client-e2e-tests.ps1 @@ -1,3 +1,17 @@ +<# +.SYNOPSIS +Cross-language client e2e matrix for the MXAccess Gateway. + +.DESCRIPTION +Drives the .NET, Go, Rust, Python, and Java client CLIs against a running +gateway + worker. For each language the script exercises session open/close, +register, bulk subscribe/unsubscribe, per-tag add-item/advise, event +streaming, a write round-trip with value assertion, error-path (parity) +checks, and API-key auth rejection. + +The gateway and worker are assumed to be already running at -Endpoint; the +script does not start or stop them. +#> [CmdletBinding()] param( [string[]]$Clients = @("dotnet", "go", "rust", "python", "java"), @@ -19,8 +33,27 @@ param( [int]$BulkTagCount = 6, [switch]$SkipStream, [switch]$SkipBulk, + # Write round-trip + value assertion. + [switch]$SkipWrite, + [string]$WriteAttribute = "TestChangingInt", + [string]$WriteType = "int32", + [int]$WriteValueBase = 424200, + [int]$WriteEchoMaxEvents = 200, + # Error-path (parity) checks. + [switch]$SkipParity, + # API-key auth rejection checks. + [switch]$SkipAuth, + # Optional env var holding an API key whose scopes are insufficient for + # open-session; when supplied the auth phase also asserts that key is + # rejected (PermissionDenied) on top of the always-on missing-key check. + [string]$RejectScopeApiKeyEnv, + # Run each language client concurrently as an isolated child process. + [switch]$Parallel, [switch]$DryRun, - [string]$ReportPath + [string]$ReportPath, + # Internal: set by -Parallel on each spawned child so it always writes its + # report (even under -DryRun) for the parent to merge. Not for direct use. + [switch]$EmitReport ) Set-StrictMode -Version Latest @@ -56,6 +89,10 @@ if ($BulkTagCount -lt 1) { throw "BulkTagCount must be greater than zero." } +if ($WriteEchoMaxEvents -lt 1) { + throw "WriteEchoMaxEvents must be greater than zero." +} + foreach ($client in $Clients) { if ($validClients -notcontains $client) { throw "Unsupported client '$client'. Supported clients: $($validClients -join ', ')." @@ -116,7 +153,10 @@ function Invoke-NativeCommand { [string]$FilePath, [string[]]$Arguments, [string]$WorkingDirectory, - [hashtable]$Environment = @{} + [hashtable]$Environment = @{}, + # When set, a non-zero exit code is returned to the caller instead of + # throwing. Used by the parity and auth phases, which expect failure. + [switch]$AllowFailure ) $process = [System.Diagnostics.Process]::new() @@ -155,7 +195,7 @@ function Invoke-NativeCommand { stderr = $stderr } - if ($result.exitCode -ne 0) { + if ($result.exitCode -ne 0 -and -not $AllowFailure) { throw "Command failed with exit code $($result.exitCode): $commandLine`n$stderr`n$stdout" } @@ -243,6 +283,25 @@ function Get-StreamEventCount { } } +# Returns the per-event objects from a stream-events reply so the write +# round-trip can inspect their values. Mirrors Get-StreamEventCount: .NET, +# Rust, and Python aggregate under `events`; Go and Java emit one event +# object per line (Read-JsonObject collapses NDJSON into an array). +function Get-StreamEvents { + param( + [string]$Client, + [object]$Json + ) + + switch ($Client) { + "dotnet" { return @($Json.events) } + "go" { return @($Json) } + "rust" { return @($Json.events) } + "python" { return @($Json.events) } + "java" { return @($Json) } + } +} + function Get-PropertyValue { param( [object]$Object, @@ -263,6 +322,63 @@ function Get-PropertyValue { return $null } +# Extracts the item handle from a streamed event, tolerating camelCase +# (protobuf-JSON) and snake_case (some MessageToDict shapes). +function Get-EventItemHandle { + param([object]$Event) + + $handle = Get-PropertyValue -Object $Event -Names @("itemHandle", "item_handle") + if ($null -eq $handle) { + return $null + } + + return [int]$handle +} + +# Extracts the scalar payload from a streamed event's MxValue as a string. +# The MxValue oneof renders to one protobuf-JSON `*Value` key; all five +# CLIs (after the Rust stream-events extension) emit the same key names. +function Get-EventScalar { + param([object]$Event) + + $value = Get-PropertyValue -Object $Event -Names @("value") + if ($null -eq $value) { + return $null + } + + foreach ($key in @("boolValue", "int32Value", "int64Value", "floatValue", "doubleValue", "stringValue")) { + $property = $value.PSObject.Properties[$key] + if ($null -ne $property -and $null -ne $property.Value) { + return [string]$property.Value + } + } + + return $null +} + +# Compares a written value against an observed event scalar. Numeric values +# are compared numerically (so 42 matches 42.0); everything else compares as +# a trimmed, case-insensitive string. +function Test-ValueEquals { + param( + [string]$Expected, + [string]$Observed + ) + + if ([string]::IsNullOrWhiteSpace($Observed)) { + return $false + } + + $expectedNumber = 0.0 + $observedNumber = 0.0 + if ([double]::TryParse($Expected, [ref]$expectedNumber) -and + [double]::TryParse($Observed, [ref]$observedNumber)) { + return $expectedNumber -eq $observedNumber + } + + return [string]::Equals($Expected.Trim(), $Observed.Trim(), [StringComparison]::OrdinalIgnoreCase) +} + function Get-BulkResults { param( [string]$Client, @@ -315,12 +431,20 @@ function Get-ClientCommand { param( [string]$Client, [string]$Operation, - [hashtable]$Values + [hashtable]$Values, + # The environment variable the client reads the API key from. Defaults + # to the run-wide -ApiKeyEnv; the auth phase overrides it to drive a + # missing-key or insufficient-scope rejection. + [string]$ApiKeyEnvName = $ApiKeyEnv ) $httpEndpoint = ConvertTo-HttpEndpoint -Value $Endpoint $hostEndpoint = ConvertTo-HostEndpoint -Value $Endpoint $clientName = "mxgw-$Client-e2e" + $streamMaxEvents = if ($Values.ContainsKey("maxEvents")) { [int]$Values.maxEvents } else { $EventLimit } + # Python's stream-events call ends on a wall-clock timeout; give it enough + # headroom to drain a large write-echo budget. + $pythonStreamTimeout = [Math]::Max(15, [int][Math]::Ceiling($streamMaxEvents / 4.0)) switch ($Client) { "dotnet" { @@ -328,7 +452,7 @@ function Get-ClientCommand { "run", "--project", "clients/dotnet/MxGateway.Client.Cli", "--", $Operation, "--endpoint", $httpEndpoint, - "--api-key-env", $ApiKeyEnv, + "--api-key-env", $ApiKeyEnvName, "--timeout", "60s", "--json" ) @@ -344,8 +468,10 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "write") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { - $arguments += @("--session-id", $Values.sessionId, "--max-events", "$EventLimit") + $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents") } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } @@ -355,7 +481,7 @@ function Get-ClientCommand { $arguments = @( "run", "./cmd/mxgw-go", $Operation, "-endpoint", $hostEndpoint, - "-api-key-env", $ApiKeyEnv, + "-api-key-env", $ApiKeyEnvName, "-plaintext", "-json" ) @@ -371,8 +497,10 @@ function Get-ClientCommand { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handles", $Values.itemHandles) + } elseif ($Operation -eq "write") { + $arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value) } elseif ($Operation -eq "stream-events") { - $arguments += @("-session-id", $Values.sessionId, "-limit", "$EventLimit") + $arguments += @("-session-id", $Values.sessionId, "-limit", "$streamMaxEvents") } elseif ($Operation -eq "close-session") { $arguments += @("-session-id", $Values.sessionId) } @@ -382,7 +510,7 @@ function Get-ClientCommand { $arguments = @( "run", "-p", "mxgw-cli", "--", $Operation, "--endpoint", $httpEndpoint, - "--api-key-env", $ApiKeyEnv, + "--api-key-env", $ApiKeyEnvName, "--json" ) if ($Operation -eq "open-session") { @@ -397,8 +525,11 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "write") { + # Rust names the type flag --value-type, unlike the other CLIs. + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { - $arguments += @("--session-id", $Values.sessionId, "--max-events", "$EventLimit") + $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents") } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } @@ -408,7 +539,7 @@ function Get-ClientCommand { $arguments = @( "-m", "mxgateway_cli", $Operation, "--endpoint", $hostEndpoint, - "--api-key-env", $ApiKeyEnv, + "--api-key-env", $ApiKeyEnvName, "--plaintext", "--json" ) @@ -424,8 +555,10 @@ function Get-ClientCommand { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "write") { + $arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { - $arguments += @("--session-id", $Values.sessionId, "--max-events", "$EventLimit", "--timeout", "15") + $arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout") } elseif ($Operation -eq "close-session") { $arguments += @("--session-id", $Values.sessionId) } @@ -438,7 +571,7 @@ function Get-ClientCommand { $cliArgs = @( $Operation, "--endpoint", $hostEndpoint, - "--api-key-env", $ApiKeyEnv, + "--api-key-env", $ApiKeyEnvName, "--plaintext", "--json" ) @@ -454,8 +587,10 @@ function Get-ClientCommand { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--items", $Values.items) } elseif ($Operation -eq "unsubscribe-bulk") { $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handles", $Values.itemHandles) + } elseif ($Operation -eq "write") { + $cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value) } elseif ($Operation -eq "stream-events") { - $cliArgs += @("--session-id", $Values.sessionId, "--limit", "$EventLimit") + $cliArgs += @("--session-id", $Values.sessionId, "--limit", "$streamMaxEvents") } elseif ($Operation -eq "close-session") { $cliArgs += @("--session-id", $Values.sessionId) } @@ -465,43 +600,475 @@ function Get-ClientCommand { } } +# Synthesizes a dry-run JSON reply for an operation so the flow can be +# validated without a live gateway. +function Get-DryRunReply { + param( + [string]$Client, + [string]$Operation, + [hashtable]$Values + ) + + switch ($Operation) { + "open-session" { return [pscustomobject]@{ sessionId = "dry-run-session-$Client"; reply = [pscustomobject]@{ sessionId = "dry-run-session-$Client" } } } + "register" { return [pscustomobject]@{ serverHandle = 1; register = [pscustomobject]@{ serverHandle = 1 }; reply = [pscustomobject]@{ register = [pscustomobject]@{ serverHandle = 1 } } } } + "add-item" { return [pscustomobject]@{ itemHandle = 1; addItem = [pscustomobject]@{ itemHandle = 1 }; reply = [pscustomobject]@{ addItem = [pscustomobject]@{ itemHandle = 1 } } } } + "write" { return [pscustomobject]@{ ok = $true; operation = "write"; reply = [pscustomobject]@{} } } + "subscribe-bulk" { + $results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process { + [pscustomobject]@{ itemHandle = $index++; tagAddress = $_; wasSuccessful = $true } + }) + return [pscustomobject]@{ subscribeBulk = [pscustomobject]@{ results = $results }; results = $results } + } + "unsubscribe-bulk" { + $results = @($Values.itemHandles -split "," | ForEach-Object { + [pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true } + }) + return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results } + } + "stream-events" { + # Echo the requested write value back so the write round-trip + # assertion passes under -DryRun. The reply is shaped per client: + # Go and Java emit one event object per line (Read-JsonObject + # collapses NDJSON to a bare array), the others aggregate the + # events under an `events` property. + $itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 } + $echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 } + $event = [pscustomobject]@{ + workerSequence = 1 + itemHandle = $itemHandle + value = [pscustomobject]@{ int32Value = $echoValue } + } + switch ($Client) { + "go" { return ,@($event) } + "java" { return ,@($event) } + "rust" { return [pscustomobject]@{ eventCount = 1; events = @($event) } } + default { return [pscustomobject]@{ events = @($event) } } + } + } + default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } } + } +} + function Invoke-ClientOperation { param( [string]$Client, [string]$Operation, - [hashtable]$Values = @{} + [hashtable]$Values = @{}, + [string]$ApiKeyEnvName = $ApiKeyEnv ) - $command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values + $command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName $result = Invoke-NativeCommand ` -FilePath $command.file ` -Arguments $command.args ` -WorkingDirectory $command.cwd ` -Environment $command.env if ($DryRun) { - switch ($Operation) { - "open-session" { return [pscustomobject]@{ sessionId = "dry-run-session-$Client"; reply = [pscustomobject]@{ sessionId = "dry-run-session-$Client" } } } - "register" { return [pscustomobject]@{ serverHandle = 1; register = [pscustomobject]@{ serverHandle = 1 }; reply = [pscustomobject]@{ register = [pscustomobject]@{ serverHandle = 1 } } } } - "add-item" { return [pscustomobject]@{ itemHandle = 1; addItem = [pscustomobject]@{ itemHandle = 1 }; reply = [pscustomobject]@{ addItem = [pscustomobject]@{ itemHandle = 1 } } } } - "subscribe-bulk" { - $results = @($Values.items -split "," | ForEach-Object -Begin { $index = 1 } -Process { - [pscustomobject]@{ itemHandle = $index++; tagAddress = $_; wasSuccessful = $true } - }) - return [pscustomobject]@{ subscribeBulk = [pscustomobject]@{ results = $results }; results = $results } - } - "unsubscribe-bulk" { - $results = @($Values.itemHandles -split "," | ForEach-Object { - [pscustomobject]@{ itemHandle = [int]$_; wasSuccessful = $true } - }) - return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results } - } - "stream-events" { return [pscustomobject]@{ eventCount = 1; events = @([pscustomobject]@{ workerSequence = 1 }) } } - default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } } - } + return Get-DryRunReply -Client $Client -Operation $Operation -Values $Values } return Read-JsonObject -Text $result.stdout } +# Runs a client operation that is expected to fail, returning the raw process +# result (exit code + stderr) without throwing. Under -DryRun a synthetic +# failure is returned so the parity and auth phases can be exercised offline. +function Invoke-ClientOperationExpectingFailure { + param( + [string]$Client, + [string]$Operation, + [hashtable]$Values = @{}, + [string]$ApiKeyEnvName = $ApiKeyEnv + ) + + if ($DryRun) { + $command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName + Write-Host "[dry-run] $(Join-CommandLine -FilePath $command.file -Arguments $command.args)" + return [pscustomobject]@{ exitCode = 1; stdout = ""; stderr = "[dry-run] synthetic expected failure" } + } + + $command = Get-ClientCommand -Client $Client -Operation $Operation -Values $Values -ApiKeyEnvName $ApiKeyEnvName + return Invoke-NativeCommand ` + -FilePath $command.file ` + -Arguments $command.args ` + -WorkingDirectory $command.cwd ` + -Environment $command.env ` + -AllowFailure +} + +# Runs the full e2e flow for a single language client and returns the result +# record. Discovered tags are passed in so the (slow) SQL discovery runs once. +function Invoke-ClientFlow { + param( + [string]$Client, + [object[]]$Tags + ) + + Write-Host "Running $Client client e2e flow against $($Tags.Count) discovered tags." + $sessionId = $null + $serverHandle = $null + $clientResult = [ordered]@{ + language = $Client + sessionId = $null + serverHandle = $null + bulk = $null + addedItems = @() + eventCount = 0 + write = $null + parity = @() + auth = @() + closed = $false + error = $null + } + + try { + $openJson = Invoke-ClientOperation -Client $Client -Operation "open-session" + $sessionId = Get-OpenSessionId -Client $Client -Json $openJson + if ([string]::IsNullOrWhiteSpace($sessionId)) { + throw "The $Client open-session command did not return a session id." + } + $clientResult.sessionId = $sessionId + + $registerJson = Invoke-ClientOperation -Client $Client -Operation "register" -Values @{ + sessionId = $sessionId + } + $serverHandle = Get-ServerHandle -Client $Client -Json $registerJson + $clientResult.serverHandle = $serverHandle + + if (-not $SkipBulk) { + $bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count))) + $bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join "," + $subscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "subscribe-bulk" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + items = $bulkItems + } + $subscribeResults = @(Get-BulkResults -Client $Client -Operation "subscribe-bulk" -Json $subscribeBulkJson) + Assert-BulkResults -Client $Client -Operation "subscribe-bulk" -Results $subscribeResults -ExpectedCount $bulkTags.Count + $bulkItemHandles = @(Get-BulkItemHandles -Results $subscribeResults) + if ($bulkItemHandles.Count -ne $bulkTags.Count) { + throw "$Client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)." + } + + $unsubscribeBulkJson = Invoke-ClientOperation -Client $Client -Operation "unsubscribe-bulk" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandles = $bulkItemHandles -join "," + } + $unsubscribeResults = @(Get-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Json $unsubscribeBulkJson) + Assert-BulkResults -Client $Client -Operation "unsubscribe-bulk" -Results $unsubscribeResults -ExpectedCount $bulkItemHandles.Count + + $clientResult.bulk = [ordered]@{ + tagCount = $bulkTags.Count + subscribedCount = $subscribeResults.Count + unsubscribedCount = $unsubscribeResults.Count + itemHandles = $bulkItemHandles + } + } + + foreach ($tag in $Tags) { + $addJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + item = $tag.fullTagReference + } + $itemHandle = Get-ItemHandle -Client $Client -Json $addJson + Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandle = $itemHandle + } | Out-Null + + $clientResult.addedItems += [ordered]@{ + tagName = $tag.tagName + attributeName = $tag.attributeName + fullTagReference = $tag.fullTagReference + itemHandle = $itemHandle + protectedWriteRequired = $tag.attributeName -eq "ProtectedValue" + } + } + + # --- Write round-trip + value assertion --------------------------- + # Write a per-client sentinel value to a configured writable + # attribute, then assert it is echoed back through the event stream. + $writeTarget = $null + if (-not $SkipWrite) { + $writeTarget = @($clientResult.addedItems | Where-Object { + $_.attributeName -eq $WriteAttribute + }) | Select-Object -First 1 + } + + $doWrite = $null -ne $writeTarget + $sentinelValue = $null + if ($doWrite) { + $sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)" + Invoke-ClientOperation -Client $Client -Operation "write" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandle = $writeTarget.itemHandle + valueType = $WriteType + value = $sentinelValue + } | Out-Null + } elseif (-not $SkipWrite) { + Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'." + } + + # --- Event streaming (also serves the write echo assertion) ------- + $captureEvents = (-not $SkipStream) -or $doWrite + if ($captureEvents) { + $streamValues = @{ sessionId = $sessionId } + if ($doWrite) { + $streamValues.maxEvents = $WriteEchoMaxEvents + $streamValues.echoItemHandle = $writeTarget.itemHandle + $streamValues.echoValue = $sentinelValue + } + $streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values $streamValues + $events = @(Get-StreamEvents -Client $Client -Json $streamJson) + $clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson + + if (-not $SkipStream -and $clientResult.eventCount -lt 1) { + throw "The $Client stream-events command returned no events." + } + + if ($doWrite) { + $echoEvent = $events | Where-Object { + (Get-EventItemHandle -Event $_) -eq $writeTarget.itemHandle -and + (Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_)) + } | Select-Object -First 1 + + if ($null -eq $echoEvent) { + throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " + + "'$($writeTarget.fullTagReference)' (item handle $($writeTarget.itemHandle)) " + + "but no matching value was observed in $($events.Count) streamed event(s). " + + "Increase -WriteEchoMaxEvents, point -WriteAttribute at a writable attribute, or pass -SkipWrite.") + } + + $clientResult.write = [ordered]@{ + attributeName = $WriteAttribute + fullTagReference = $writeTarget.fullTagReference + itemHandle = $writeTarget.itemHandle + valueType = $WriteType + value = $sentinelValue + echoObserved = $true + echoWorkerSequence = (Get-PropertyValue -Object $echoEvent -Names @("workerSequence", "worker_sequence")) + } + } + } + + # --- Error-path (parity) checks ----------------------------------- + # MXAccess parity: an invalid item handle and an unknown session must + # both be rejected rather than silently succeeding. + if (-not $SkipParity) { + $parityChecks = @( + [ordered]@{ + check = "invalid-item-handle" + operation = "advise" + values = @{ sessionId = $sessionId; serverHandle = $serverHandle; itemHandle = 2147483647 } + }, + [ordered]@{ + check = "unknown-session" + operation = "register" + values = @{ sessionId = [guid]::NewGuid().ToString() } + } + ) + + foreach ($parityCheck in $parityChecks) { + $parityResult = Invoke-ClientOperationExpectingFailure ` + -Client $Client -Operation $parityCheck.operation -Values $parityCheck.values + $passed = $parityResult.exitCode -ne 0 + $clientResult.parity += [ordered]@{ + check = $parityCheck.check + operation = $parityCheck.operation + exitCode = $parityResult.exitCode + passed = $passed + } + if (-not $passed) { + throw "$Client parity check '$($parityCheck.check)' expected $($parityCheck.operation) to fail but it exited 0." + } + } + } + + # --- API-key auth rejection --------------------------------------- + # Runs after a working session is established, so a non-zero exit is + # an auth rejection rather than the gateway being unreachable. + if (-not $SkipAuth) { + $authChecks = @( + [ordered]@{ check = "missing-api-key"; apiKeyEnv = $script:missingKeyEnvName } + ) + if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) { + $authChecks += [ordered]@{ check = "insufficient-scope"; apiKeyEnv = $RejectScopeApiKeyEnv } + } + + foreach ($authCheck in $authChecks) { + $authResult = Invoke-ClientOperationExpectingFailure ` + -Client $Client -Operation "open-session" -ApiKeyEnvName $authCheck.apiKeyEnv + $passed = $authResult.exitCode -ne 0 + $clientResult.auth += [ordered]@{ + check = $authCheck.check + exitCode = $authResult.exitCode + passed = $passed + } + if (-not $passed) { + throw "$Client auth check '$($authCheck.check)' expected open-session to be rejected but it exited 0." + } + } + } + } catch { + $clientResult.error = $_.Exception.Message + Write-Warning "$Client e2e flow failed: $($clientResult.error)" + } finally { + if (-not [string]::IsNullOrWhiteSpace($sessionId)) { + try { + Invoke-ClientOperation -Client $Client -Operation "close-session" -Values @{ + sessionId = $sessionId + } | Out-Null + $clientResult.closed = $true + } catch { + $clientResult.error = "$($clientResult.error) close-session failed: $($_.Exception.Message)" + } + } + } + + return $clientResult +} + +# Forwards every run parameter to a single-client child invocation used by +# -Parallel. -Parallel itself is intentionally omitted so the child runs the +# serial path. +function Get-ChildArgumentList { + param( + [string]$Client, + [string]$ChildReportPath + ) + + $childArgs = @( + "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", $PSCommandPath, + "-Clients", $Client, + "-MachineStart", "$MachineStart", + "-MachineEnd", "$MachineEnd", + "-Attributes", ($Attributes -join ","), + "-Endpoint", $Endpoint, + "-ApiKeyEnv", $ApiKeyEnv, + "-SqlServer", $SqlServer, + "-Database", $Database, + "-EventLimit", "$EventLimit", + "-BulkTagCount", "$BulkTagCount", + "-WriteAttribute", $WriteAttribute, + "-WriteType", $WriteType, + "-WriteValueBase", "$WriteValueBase", + "-WriteEchoMaxEvents", "$WriteEchoMaxEvents", + "-ReportPath", $ChildReportPath, + "-EmitReport" + ) + if (-not [string]::IsNullOrWhiteSpace($RejectScopeApiKeyEnv)) { + $childArgs += @("-RejectScopeApiKeyEnv", $RejectScopeApiKeyEnv) + } + if ($SkipStream) { $childArgs += "-SkipStream" } + if ($SkipBulk) { $childArgs += "-SkipBulk" } + if ($SkipWrite) { $childArgs += "-SkipWrite" } + if ($SkipParity) { $childArgs += "-SkipParity" } + if ($SkipAuth) { $childArgs += "-SkipAuth" } + if ($DryRun) { $childArgs += "-DryRun" } + return $childArgs +} + +# An env var name that is guaranteed not to be set in this process, used to +# drive the missing-API-key auth rejection. +$script:missingKeyEnvName = "MXGW_E2E_MISSING_KEY_" + ([guid]::NewGuid().ToString("N")) + +# --- Parallel mode: fan out one isolated child process per client ---------- +if ($Parallel -and $Clients.Count -gt 1) { + Write-Host "Running $($Clients.Count) client e2e flows in parallel." + $childRoot = Join-Path ([System.IO.Path]::GetTempPath()) ("mxgw-e2e-" + ([guid]::NewGuid().ToString("N"))) + New-Item -ItemType Directory -Path $childRoot -Force | Out-Null + + $children = @() + foreach ($client in $Clients) { + $childReport = Join-Path $childRoot "$client.json" + $childLog = Join-Path $childRoot "$client.log" + $childArgs = Get-ChildArgumentList -Client $client -ChildReportPath $childReport + $process = Start-Process -FilePath "pwsh" -ArgumentList $childArgs ` + -RedirectStandardOutput $childLog -RedirectStandardError "$childLog.err" ` + -NoNewWindow -PassThru + $children += [pscustomobject]@{ + client = $client + process = $process + report = $childReport + log = $childLog + } + } + + foreach ($child in $children) { + $child.process.WaitForExit() + } + + $mergedClients = @() + $discoveredTags = @() + $hadFailure = $false + foreach ($child in $children) { + foreach ($logPath in @($child.log, "$($child.log).err")) { + if ((Test-Path $logPath) -and -not [string]::IsNullOrWhiteSpace((Get-Content -Raw -Path $logPath))) { + Write-Host "----- $($child.client) -----" + Get-Content -Path $logPath | ForEach-Object { Write-Host $_ } + } + } + + if ($child.process.ExitCode -ne 0) { + $hadFailure = $true + } + + if (Test-Path $child.report) { + $childRun = Get-Content -Raw -Path $child.report | ConvertFrom-Json + $mergedClients += @($childRun.clients) + if ($discoveredTags.Count -eq 0) { + $discoveredTags = @($childRun.discoveredTags) + } + } else { + $hadFailure = $true + $mergedClients += [pscustomobject]@{ + language = $child.client + error = "Child process exited $($child.process.ExitCode) without writing a report." + } + } + } + + $run = [ordered]@{ + schemaVersion = 1 + endpoint = $Endpoint + apiKeyEnv = $ApiKeyEnv + machineStart = $MachineStart + machineEnd = $MachineEnd + attributes = $Attributes + eventLimit = $EventLimit + bulkTagCount = $BulkTagCount + skipStream = [bool]$SkipStream + skipBulk = [bool]$SkipBulk + skipWrite = [bool]$SkipWrite + skipParity = [bool]$SkipParity + skipAuth = [bool]$SkipAuth + writeAttribute = $WriteAttribute + parallel = $true + discoveredTags = $discoveredTags + clients = $mergedClients + completedAt = (Get-Date).ToUniversalTime().ToString("O") + success = -not $hadFailure + } + + $reportDirectory = Split-Path -Parent $ReportPath + if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) { + New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null + } + $run | ConvertTo-Json -Depth 8 | Set-Content -Path $ReportPath -Encoding UTF8 + Write-Host "Wrote merged e2e report to $ReportPath" + Remove-Item -Path $childRoot -Recurse -Force -ErrorAction SilentlyContinue + + if ($hadFailure) { + exit 1 + } + return +} + +# --- Serial mode ----------------------------------------------------------- $discoveryJson = & $discoveryScript ` -MachineStart $MachineStart ` -MachineEnd $MachineEnd ` @@ -534,129 +1101,32 @@ $run = [ordered]@{ bulkTagCount = $BulkTagCount skipStream = [bool]$SkipStream skipBulk = [bool]$SkipBulk + skipWrite = [bool]$SkipWrite + skipParity = [bool]$SkipParity + skipAuth = [bool]$SkipAuth + writeAttribute = $WriteAttribute + parallel = $false startedAt = (Get-Date).ToUniversalTime().ToString("O") discoveredTags = $tags clients = @() } $hadFailure = $false +$script:clientFlowIndex = 0 foreach ($client in $Clients) { - Write-Host "Running $client client e2e flow against $($tags.Count) discovered tags." - $sessionId = $null - $serverHandle = $null - $clientResult = [ordered]@{ - language = $client - sessionId = $null - serverHandle = $null - bulk = $null - addedItems = @() - eventCount = 0 - closed = $false - error = $null - } - - try { - $openJson = Invoke-ClientOperation -Client $client -Operation "open-session" - $sessionId = Get-OpenSessionId -Client $client -Json $openJson - if ([string]::IsNullOrWhiteSpace($sessionId)) { - throw "The $client open-session command did not return a session id." - } - $clientResult.sessionId = $sessionId - - $registerJson = Invoke-ClientOperation -Client $client -Operation "register" -Values @{ - sessionId = $sessionId - } - $serverHandle = Get-ServerHandle -Client $client -Json $registerJson - $clientResult.serverHandle = $serverHandle - - if (-not $SkipBulk) { - $bulkTags = @($tags | Select-Object -First ([Math]::Min($BulkTagCount, $tags.Count))) - $bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join "," - $subscribeBulkJson = Invoke-ClientOperation -Client $client -Operation "subscribe-bulk" -Values @{ - sessionId = $sessionId - serverHandle = $serverHandle - items = $bulkItems - } - $subscribeResults = @(Get-BulkResults -Client $client -Operation "subscribe-bulk" -Json $subscribeBulkJson) - Assert-BulkResults -Client $client -Operation "subscribe-bulk" -Results $subscribeResults -ExpectedCount $bulkTags.Count - $bulkItemHandles = @(Get-BulkItemHandles -Results $subscribeResults) - if ($bulkItemHandles.Count -ne $bulkTags.Count) { - throw "$client subscribe-bulk returned $($bulkItemHandles.Count) usable item handle(s); expected $($bulkTags.Count)." - } - - $unsubscribeBulkJson = Invoke-ClientOperation -Client $client -Operation "unsubscribe-bulk" -Values @{ - sessionId = $sessionId - serverHandle = $serverHandle - itemHandles = $bulkItemHandles -join "," - } - $unsubscribeResults = @(Get-BulkResults -Client $client -Operation "unsubscribe-bulk" -Json $unsubscribeBulkJson) - Assert-BulkResults -Client $client -Operation "unsubscribe-bulk" -Results $unsubscribeResults -ExpectedCount $bulkItemHandles.Count - - $clientResult.bulk = [ordered]@{ - tagCount = $bulkTags.Count - subscribedCount = $subscribeResults.Count - unsubscribedCount = $unsubscribeResults.Count - itemHandles = $bulkItemHandles - } - } - - foreach ($tag in $tags) { - $addJson = Invoke-ClientOperation -Client $client -Operation "add-item" -Values @{ - sessionId = $sessionId - serverHandle = $serverHandle - item = $tag.fullTagReference - } - $itemHandle = Get-ItemHandle -Client $client -Json $addJson - Invoke-ClientOperation -Client $client -Operation "advise" -Values @{ - sessionId = $sessionId - serverHandle = $serverHandle - itemHandle = $itemHandle - } | Out-Null - - $clientResult.addedItems += [ordered]@{ - tagName = $tag.tagName - attributeName = $tag.attributeName - fullTagReference = $tag.fullTagReference - itemHandle = $itemHandle - protectedWriteRequired = $tag.attributeName -eq "ProtectedValue" - } - } - - if (-not $SkipStream) { - $streamJson = Invoke-ClientOperation -Client $client -Operation "stream-events" -Values @{ - sessionId = $sessionId - } - $clientResult.eventCount = Get-StreamEventCount -Client $client -Json $streamJson - if ($clientResult.eventCount -lt 1) { - throw "The $client stream-events command returned no events." - } - } - } catch { + $clientResult = Invoke-ClientFlow -Client $client -Tags $tags + if (-not [string]::IsNullOrWhiteSpace($clientResult.error)) { $hadFailure = $true - $clientResult.error = $_.Exception.Message - Write-Warning "$client e2e flow failed: $($clientResult.error)" - } finally { - if (-not [string]::IsNullOrWhiteSpace($sessionId)) { - try { - Invoke-ClientOperation -Client $client -Operation "close-session" -Values @{ - sessionId = $sessionId - } | Out-Null - $clientResult.closed = $true - } catch { - $hadFailure = $true - $clientResult.error = "$($clientResult.error) close-session failed: $($_.Exception.Message)" - } - } } - $run.clients += $clientResult + $script:clientFlowIndex++ } $run.completedAt = (Get-Date).ToUniversalTime().ToString("O") $run.success = -not $hadFailure -if (-not $DryRun) { +if (-not $DryRun -or $EmitReport) { $reportDirectory = Split-Path -Parent $ReportPath if (-not [string]::IsNullOrWhiteSpace($reportDirectory)) { New-Item -ItemType Directory -Path $reportDirectory -Force | Out-Null