From 758aca23555752a2e37f8a20945c658e3fc02a14 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 14:45:47 -0400 Subject: [PATCH] Make the e2e write phase work live across all five clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Running the matrix against a live gateway surfaced several issues: - The write phase is now opt-in (-VerifyWrite, was -SkipWrite). It runs right after register so only a small event backlog precedes the write, and asserts the reliable OnWriteComplete signal (the written value is not echoed back by a provider-driven attribute like TestChangingInt, so the value compare is best-effort). - Java was launched as bare "gradle", which .NET's Process.Start cannot exec (it is gradle.bat) — resolve the launcher and run it via cmd.exe. - The Java client's MxEventStream queue capacity was 16, which overflows on any active session's backlog-replay burst; raised to 1024. - The Rust stream-events CLI now renders the event family as the proto enum name, matching the protobuf-JSON the other four clients emit. Update docs/GatewayTesting.md for the reworked write phase. Verified live: the full five-client matrix passes with -VerifyWrite. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mxgateway/client/MxGatewayClient.java | 7 +- clients/rust/crates/mxgw-cli/src/main.rs | 9 +- docs/GatewayTesting.md | 44 ++-- scripts/run-client-e2e-tests.ps1 | 203 +++++++++++------- 4 files changed, 168 insertions(+), 95 deletions(-) diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java index 6aa42c4..a5182d2 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java @@ -260,7 +260,12 @@ public final class MxGatewayClient implements AutoCloseable { * @return an iterator-style stream of events */ public MxEventStream streamEvents(StreamEventsRequest request) { - MxEventStream stream = new MxEventStream(16); + // The buffer must absorb the gateway's session-backlog replay burst, + // which arrives far faster than the iterator drains it. A small queue + // overflows on any moderately active session; 1024 covers a realistic + // backlog while still bounding memory and preserving overflow + // detection for a genuinely unbounded stream. + MxEventStream stream = new MxEventStream(1024); MxGatewayChannels.withStreamDeadline(rawAsyncStub(), options).streamEvents(request, stream.observer()); return stream; } diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 6f71836..6ed4052 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -17,7 +17,7 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use mxgateway_client::generated::mxaccess_gateway::v1::{ - CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, + CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, }; use mxgateway_client::{ @@ -842,8 +842,13 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) { /// matrix can extract and compare event values uniformly across all five /// client CLIs. fn event_to_json(event: &MxEvent) -> Value { + // Render the family as the proto enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE) + // so it matches the protobuf-JSON the .NET/Go/Java/Python CLIs emit. + let family = MxEventFamily::try_from(event.family) + .map(|family| family.as_str_name().to_owned()) + .unwrap_or_else(|_| event.family.to_string()); json!({ - "family": event.family, + "family": family, "sessionId": event.session_id, "serverHandle": event.server_handle, "itemHandle": event.item_handle, diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md index 3768f4f..b0bf7a4 100644 --- a/docs/GatewayTesting.md +++ b/docs/GatewayTesting.md @@ -180,19 +180,30 @@ path and writes a JSON report under `artifacts/e2e/`: 2. **Bulk** — verifies `SubscribeBulk` / `UnsubscribeBulk` on a bounded tag subset (skip with `-SkipBulk`). 3. **Add-item / advise** — adds and advises every discovered test tag. -4. **Write round-trip** — writes a per-client sentinel value to a configurable - writable attribute (`-WriteAttribute`, default `TestChangingInt`), then - asserts the same value is echoed back through the event stream. Skip with - `-SkipWrite`. The Rust `stream-events` CLI emits full per-event JSON - (`itemHandle` + `value`) so all five clients run an identical value compare. -5. **Stream** — asserts a bounded event stream delivers at least one event +4. **Stream** — asserts a bounded event stream delivers at least one event (skip with `-SkipStream`). -6. **Parity** — asserts MXAccess error paths are rejected rather than silently +5. **Parity** — asserts MXAccess error paths are rejected rather than silently succeeding: an invalid item handle and an unknown session id (skip with `-SkipParity`). -7. **Auth rejection** — asserts `open-session` is rejected when the API key is +6. **Auth rejection** — asserts `open-session` is rejected when the API key is missing, and (when `-RejectScopeApiKeyEnv` names an insufficient-scope key) when the key lacks the required scope. Skip with `-SkipAuth`. +7. **Write round-trip** — *opt-in (`-VerifyWrite`).* Runs right after + `register`: adds and advises a configurable writable attribute + (`-WriteAttribute`, default `TestChangingInt`), writes a per-client + sentinel value, then streams events and asserts an `OnWriteComplete` event + for that item is observed — proof the write round-tripped through the + gateway, worker, and MXAccess provider. The written value being echoed back + in an `OnDataChange` is recorded best-effort (`echoObserved`): a + provider-driven attribute such as `TestChangingInt` accepts the write but + immediately overwrites it, so no data-change carries the value back. The + Rust `stream-events` CLI emits full per-event JSON (`family`, `itemHandle`, + `value`) so all five clients apply the same checks. + + It is opt-in because it mutates live tag state. The phase fails fast if the + write command is rejected — e.g. against a gateway whose worker predates + write support (`MxAccessCommandExecutor` returning `InvalidRequest` for + `Write`/`Write2`/`WriteSecured`/`WriteSecured2`). Build the gateway and worker, start the gateway, and provide a valid API key before running the client e2e script: @@ -209,9 +220,9 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Clien powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -BulkTagCount 10 powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipStream powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipBulk -# Write round-trip: point at a writable scalar attribute and its value type. -powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -WriteAttribute TestChangingInt -WriteType int32 -powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipWrite +# Write round-trip (opt-in): point at a writable scalar attribute and its +# value type. +powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyWrite -WriteAttribute TestChangingInt -WriteType int32 # Auth rejection: also assert an insufficient-scope key is denied. powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY # Run all five clients concurrently as isolated child processes. @@ -221,11 +232,12 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -DryRu powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -Endpoint localhost:5000 -ApiKeyEnv MXGATEWAY_API_KEY ``` -The write round-trip fails loudly if `-WriteAttribute` does not name a writable -scalar attribute, if the write is rejected, or if the sentinel value is not -observed within `-WriteEchoMaxEvents` (default 200) streamed events. Point -`-WriteAttribute` at a stable writable attribute, raise `-WriteEchoMaxEvents`, -or pass `-SkipWrite` if no suitable attribute is deployed. +When `-VerifyWrite` is enabled, the write round-trip fails loudly if the write +command is rejected, if `-WriteAttribute` does not name a writable scalar +attribute, or if no `OnWriteComplete` event is observed for the written item +within `-WriteEchoMaxEvents` (default 200) streamed events. Raise +`-WriteEchoMaxEvents` if the gateway's per-session event backlog is large +enough to push `OnWriteComplete` past that bound. ## Focused Commands diff --git a/scripts/run-client-e2e-tests.ps1 b/scripts/run-client-e2e-tests.ps1 index 47274f8..a260e75 100644 --- a/scripts/run-client-e2e-tests.ps1 +++ b/scripts/run-client-e2e-tests.ps1 @@ -33,8 +33,10 @@ param( [int]$BulkTagCount = 6, [switch]$SkipStream, [switch]$SkipBulk, - # Write round-trip + value assertion. - [switch]$SkipWrite, + # Write round-trip. Opt-in because it mutates live tag state: it writes a + # sentinel value to -WriteAttribute and asserts an OnWriteComplete event + # confirms the write reached the MXAccess provider. + [switch]$VerifyWrite, [string]$WriteAttribute = "TestChangingInt", [string]$WriteType = "int32", [int]$WriteValueBase = 424200, @@ -335,6 +337,14 @@ function Get-EventItemHandle { return [int]$handle } +# Extracts the event family as a string. All five CLIs render it as the +# protobuf enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE). +function Get-EventFamily { + param([object]$Event) + + return [string](Get-PropertyValue -Object $Event -Names @("family")) +} + # Extracts the scalar payload from a streamed event's MxValue as a string. # The MxValue oneof renders to one protobuf-JSON `*Value` key; all five # CLIs (after the Rust stream-events extension) emit the same key names. @@ -595,7 +605,20 @@ function Get-ClientCommand { $cliArgs += @("--session-id", $Values.sessionId) } $arguments = @("--quiet", ":mxgateway-cli:run", "--args=$($cliArgs -join ' ')") - return [pscustomobject]@{ file = "gradle"; args = $arguments; cwd = (Join-Path $repoRoot "clients/java"); env = @{} } + # Gradle ships as gradle.bat on Windows; .NET's Process.Start + # (UseShellExecute=false) cannot launch a batch file directly, so + # resolve the launcher and run it through cmd.exe. + $gradleCommand = Get-Command "gradle.bat", "gradle.cmd", "gradle.exe", "gradle" ` + -ErrorAction SilentlyContinue | Select-Object -First 1 + if ($null -eq $gradleCommand) { + throw "The 'gradle' command was not found on PATH; the Java client e2e flow requires Gradle." + } + return [pscustomobject]@{ + file = "cmd.exe" + args = @("/c", $gradleCommand.Source) + $arguments + cwd = (Join-Path $repoRoot "clients/java") + env = @{} + } } } } @@ -627,23 +650,30 @@ function Get-DryRunReply { return [pscustomobject]@{ unsubscribeBulk = [pscustomobject]@{ results = $results }; results = $results } } "stream-events" { - # Echo the requested write value back so the write round-trip - # assertion passes under -DryRun. The reply is shaped per client: - # Go and Java emit one event object per line (Read-JsonObject - # collapses NDJSON to a bare array), the others aggregate the - # events under an `events` property. + # Synthesize an OnDataChange (carrying the written value) and an + # OnWriteComplete so the write round-trip assertion passes under + # -DryRun. The reply is shaped per client: Go and Java emit one + # event object per line (Read-JsonObject collapses NDJSON to a + # bare array), the others aggregate the events under `events`. $itemHandle = if ($Values.ContainsKey("echoItemHandle")) { [int]$Values.echoItemHandle } else { 1 } $echoValue = if ($Values.ContainsKey("echoValue")) { $Values.echoValue } else { 1 } - $event = [pscustomobject]@{ + $dataEvent = [pscustomobject]@{ workerSequence = 1 + family = "MX_EVENT_FAMILY_ON_DATA_CHANGE" itemHandle = $itemHandle value = [pscustomobject]@{ int32Value = $echoValue } } + $writeCompleteEvent = [pscustomobject]@{ + workerSequence = 2 + family = "MX_EVENT_FAMILY_ON_WRITE_COMPLETE" + itemHandle = $itemHandle + } + $events = @($dataEvent, $writeCompleteEvent) switch ($Client) { - "go" { return ,@($event) } - "java" { return ,@($event) } - "rust" { return [pscustomobject]@{ eventCount = 1; events = @($event) } } - default { return [pscustomobject]@{ events = @($event) } } + "go" { return ,$events } + "java" { return ,$events } + "rust" { return [pscustomobject]@{ eventCount = $events.Count; events = $events } } + default { return [pscustomobject]@{ events = $events } } } } default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } } @@ -735,6 +765,83 @@ function Invoke-ClientFlow { $serverHandle = Get-ServerHandle -Client $Client -Json $registerJson $clientResult.serverHandle = $serverHandle + # --- Write round-trip + value assertion --------------------------- + # Runs right after register, before the bulk and add-item phases, so + # only a small backlog of events precedes the write. The gateway + # replays the per-session event buffer from the start, so the + # post-write OnWriteComplete must be reachable within the bounded + # -WriteEchoMaxEvents window. + if ($VerifyWrite) { + $writeTag = @($Tags | Where-Object { + $_.attributeName -eq $WriteAttribute + }) | Select-Object -First 1 + + if ($null -eq $writeTag) { + Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'." + } else { + $writeAddJson = Invoke-ClientOperation -Client $Client -Operation "add-item" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + item = $writeTag.fullTagReference + } + $writeItemHandle = Get-ItemHandle -Client $Client -Json $writeAddJson + Invoke-ClientOperation -Client $Client -Operation "advise" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandle = $writeItemHandle + } | Out-Null + + $sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)" + Invoke-ClientOperation -Client $Client -Operation "write" -Values @{ + sessionId = $sessionId + serverHandle = $serverHandle + itemHandle = $writeItemHandle + valueType = $WriteType + value = $sentinelValue + } | Out-Null + + $writeStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{ + sessionId = $sessionId + maxEvents = $WriteEchoMaxEvents + echoItemHandle = $writeItemHandle + echoValue = $sentinelValue + } + $writeEvents = @(Get-StreamEvents -Client $Client -Json $writeStreamJson) + $writeItemEvents = @($writeEvents | Where-Object { + (Get-EventItemHandle -Event $_) -eq $writeItemHandle + }) + + # The reliable write round-trip signal: MXAccess fires + # OnWriteComplete once the write reaches the provider. The + # value echo is best-effort — a provider-driven attribute + # (e.g. a simulated counter) accepts the write but does not + # hold the value, so no OnDataChange carries it back. + $writeCompleteEvent = $writeItemEvents | Where-Object { + (Get-EventFamily -Event $_) -match "WRITE_COMPLETE" + } | Select-Object -First 1 + $echoEvent = $writeItemEvents | Where-Object { + Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_) + } | Select-Object -First 1 + + if ($null -eq $writeCompleteEvent) { + throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " + + "'$($writeTag.fullTagReference)' (item handle $writeItemHandle) but no " + + "OnWriteComplete event was observed in $($writeEvents.Count) streamed event(s). " + + "Increase -WriteEchoMaxEvents, or drop -VerifyWrite.") + } + + $clientResult.write = [ordered]@{ + attributeName = $WriteAttribute + fullTagReference = $writeTag.fullTagReference + itemHandle = $writeItemHandle + valueType = $WriteType + value = $sentinelValue + writeCompleteObserved = $true + echoObserved = ($null -ne $echoEvent) + } + } + } + if (-not $SkipBulk) { $bulkTags = @($Tags | Select-Object -First ([Math]::Min($BulkTagCount, $Tags.Count))) $bulkItems = ($bulkTags | ForEach-Object { $_.fullTagReference }) -join "," @@ -788,71 +895,15 @@ function Invoke-ClientFlow { } } - # --- Write round-trip + value assertion --------------------------- - # Write a per-client sentinel value to a configured writable - # attribute, then assert it is echoed back through the event stream. - $writeTarget = $null - if (-not $SkipWrite) { - $writeTarget = @($clientResult.addedItems | Where-Object { - $_.attributeName -eq $WriteAttribute - }) | Select-Object -First 1 - } - - $doWrite = $null -ne $writeTarget - $sentinelValue = $null - if ($doWrite) { - $sentinelValue = "$($WriteValueBase + $script:clientFlowIndex)" - Invoke-ClientOperation -Client $Client -Operation "write" -Values @{ + # --- Event streaming ---------------------------------------------- + if (-not $SkipStream) { + $streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values @{ sessionId = $sessionId - serverHandle = $serverHandle - itemHandle = $writeTarget.itemHandle - valueType = $WriteType - value = $sentinelValue - } | Out-Null - } elseif (-not $SkipWrite) { - Write-Warning "$Client write phase skipped: no discovered tag has attribute '$WriteAttribute'." - } - - # --- Event streaming (also serves the write echo assertion) ------- - $captureEvents = (-not $SkipStream) -or $doWrite - if ($captureEvents) { - $streamValues = @{ sessionId = $sessionId } - if ($doWrite) { - $streamValues.maxEvents = $WriteEchoMaxEvents - $streamValues.echoItemHandle = $writeTarget.itemHandle - $streamValues.echoValue = $sentinelValue } - $streamJson = Invoke-ClientOperation -Client $Client -Operation "stream-events" -Values $streamValues - $events = @(Get-StreamEvents -Client $Client -Json $streamJson) $clientResult.eventCount = Get-StreamEventCount -Client $Client -Json $streamJson - - if (-not $SkipStream -and $clientResult.eventCount -lt 1) { + if ($clientResult.eventCount -lt 1) { throw "The $Client stream-events command returned no events." } - - if ($doWrite) { - $echoEvent = $events | Where-Object { - (Get-EventItemHandle -Event $_) -eq $writeTarget.itemHandle -and - (Test-ValueEquals -Expected $sentinelValue -Observed (Get-EventScalar -Event $_)) - } | Select-Object -First 1 - - if ($null -eq $echoEvent) { - throw ("$Client write round-trip failed: wrote $WriteType=$sentinelValue to " + - "'$($writeTarget.fullTagReference)' (item handle $($writeTarget.itemHandle)) " + - "but no matching value was observed in $($events.Count) streamed event(s). " + - "Increase -WriteEchoMaxEvents, point -WriteAttribute at a writable attribute, or pass -SkipWrite.") - } - - $clientResult.write = [ordered]@{ - attributeName = $WriteAttribute - fullTagReference = $writeTarget.fullTagReference - itemHandle = $writeTarget.itemHandle - valueType = $WriteType - value = $sentinelValue - echoObserved = $true - echoWorkerSequence = (Get-PropertyValue -Object $echoEvent -Names @("workerSequence", "worker_sequence")) - } - } } # --- Error-path (parity) checks ----------------------------------- @@ -965,7 +1016,7 @@ function Get-ChildArgumentList { } if ($SkipStream) { $childArgs += "-SkipStream" } if ($SkipBulk) { $childArgs += "-SkipBulk" } - if ($SkipWrite) { $childArgs += "-SkipWrite" } + if ($VerifyWrite) { $childArgs += "-VerifyWrite" } if ($SkipParity) { $childArgs += "-SkipParity" } if ($SkipAuth) { $childArgs += "-SkipAuth" } if ($DryRun) { $childArgs += "-DryRun" } @@ -1043,7 +1094,7 @@ if ($Parallel -and $Clients.Count -gt 1) { bulkTagCount = $BulkTagCount skipStream = [bool]$SkipStream skipBulk = [bool]$SkipBulk - skipWrite = [bool]$SkipWrite + verifyWrite = [bool]$VerifyWrite skipParity = [bool]$SkipParity skipAuth = [bool]$SkipAuth writeAttribute = $WriteAttribute @@ -1101,7 +1152,7 @@ $run = [ordered]@{ bulkTagCount = $BulkTagCount skipStream = [bool]$SkipStream skipBulk = [bool]$SkipBulk - skipWrite = [bool]$SkipWrite + verifyWrite = [bool]$VerifyWrite skipParity = [bool]$SkipParity skipAuth = [bool]$SkipAuth writeAttribute = $WriteAttribute